You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/26 14:54:29 UTC

[1/4] incubator-carbondata git commit: Added V3 Format Writer and Reader Code

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 922683eb8 -> 740358c13


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index eafcd9f..289c156 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -59,10 +59,12 @@ import org.apache.carbondata.core.util.CarbonMergerUtil;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileWriter;
 import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.processing.mdkeygen.file.FileData;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
@@ -137,8 +139,20 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * size reserved in one file for writing block meta data. It will be in percentage
    */
   private int spaceReservedForBlockMetaSize;
-  private FileOutputStream fileOutputStream;
-  private List<BlockIndexInfo> blockIndexInfoList;
+
+  protected FileOutputStream fileOutputStream;
+
+  protected List<BlockIndexInfo> blockIndexInfoList;
+
+  /**
+   * list of metadata for V3 format
+   */
+  protected List<BlockletInfo3> blockletMetadata;
+
+  /**
+   * list of blocklet index
+   */
+  protected List<org.apache.carbondata.format.BlockletIndex> blockletIndex;
 
   public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) {
     this.dataWriterVo = dataWriterVo;
@@ -187,6 +201,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
             CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)));
     this.dataChunksOffsets = new ArrayList<>();
     this.dataChunksLength = new ArrayList<>();
+    blockletMetadata = new ArrayList<BlockletInfo3>();
+    blockletIndex = new ArrayList<>();
   }
 
   /**
@@ -242,12 +258,14 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
       LOGGER.info("Writing data to file as max file size reached for file: " + fileName
           + " .Data block size: " + currentFileSize);
       // write meta data to end of the existing file
-      writeBlockletInfoToFile(blockletInfoList, fileChannel, fileName);
+      writeBlockletInfoToFile(fileChannel, fileName);
       this.currentFileSize = 0;
       blockletInfoList =
           new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
       this.dataChunksOffsets = new ArrayList<>();
       this.dataChunksLength = new ArrayList<>();
+      this.blockletMetadata = new ArrayList<>();
+      this.blockletIndex = new ArrayList<>();
       CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
       // rename carbon data file from in progress status to actual
       renameCarbonDataFile();
@@ -316,7 +334,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   /**
    * This method will write metadata at the end of file file format in thrift format
    */
-  protected abstract void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList,
+  protected abstract void writeBlockletInfoToFile(
       FileChannel channel, String filePath) throws CarbonDataWriterException;
 
   /**
@@ -327,19 +345,19 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * @param filePath        file path
    * @param currentPosition current offset
    */
-  protected void fillBlockIndexInfoDetails(List<BlockletInfoColumnar> infoList, long numberOfRows,
+  protected void fillBlockIndexInfoDetails(long numberOfRows,
       String filePath, long currentPosition) {
 
     // as min-max will change for each blocklet and second blocklet min-max can be lesser than
     // the first blocklet so we need to calculate the complete block level min-max by taking
     // the min value of each column and max value of each column
-    byte[][] currentMinValue = infoList.get(0).getColumnMinData().clone();
-    byte[][] currentMaxValue = infoList.get(0).getColumnMaxData().clone();
+    byte[][] currentMinValue = blockletInfoList.get(0).getColumnMinData().clone();
+    byte[][] currentMaxValue = blockletInfoList.get(0).getColumnMaxData().clone();
     byte[][] minValue = null;
     byte[][] maxValue = null;
-    for (int i = 1; i < infoList.size(); i++) {
-      minValue = infoList.get(i).getColumnMinData();
-      maxValue = infoList.get(i).getColumnMaxData();
+    for (int i = 1; i < blockletInfoList.size(); i++) {
+      minValue = blockletInfoList.get(i).getColumnMinData();
+      maxValue = blockletInfoList.get(i).getColumnMaxData();
       for (int j = 0; j < maxValue.length; j++) {
         if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue[j]) > 0) {
           currentMinValue[j] = minValue[j].clone();
@@ -352,8 +370,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     // start and end key we can take based on first blocklet
     // start key will be the block start key as
     // it is the least key and end blocklet end key will be the block end key as it is the max key
-    BlockletBTreeIndex btree = new BlockletBTreeIndex(infoList.get(0).getStartKey(),
-        infoList.get(infoList.size() - 1).getEndKey());
+    BlockletBTreeIndex btree = new BlockletBTreeIndex(blockletInfoList.get(0).getStartKey(),
+        blockletInfoList.get(blockletInfoList.size() - 1).getEndKey());
     BlockletMinMaxIndex minmax = new BlockletMinMaxIndex();
     minmax.setMinValues(currentMinValue);
     minmax.setMaxValues(currentMaxValue);
@@ -414,7 +432,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * @throws IOException               throws io exception if any problem while writing
    * @throws CarbonDataWriterException data writing
    */
-  private void writeIndexFile() throws IOException, CarbonDataWriterException {
+  protected void writeIndexFile() throws IOException, CarbonDataWriterException {
     // get the header
     IndexHeader indexHeader = CarbonMetadataUtil
         .getIndexHeader(localCardinality, thriftColumnSchemaList, dataWriterVo.getBucketNumber());
@@ -444,7 +462,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    *
    * @throws CarbonDataWriterException
    */
-  private void closeExecutorService() throws CarbonDataWriterException {
+  protected void closeExecutorService() throws CarbonDataWriterException {
     executorService.shutdown();
     try {
       executorService.awaitTermination(2, TimeUnit.HOURS);
@@ -467,7 +485,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    *
    * @throws CarbonDataWriterException
    */
-  private void renameCarbonDataFile() throws CarbonDataWriterException {
+  protected void renameCarbonDataFile() throws CarbonDataWriterException {
     File origFile = new File(this.fileName.substring(0, this.fileName.lastIndexOf('.')));
     File curFile = new File(this.fileName);
     if (!curFile.renameTo(origFile)) {
@@ -481,7 +499,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * @param localFileName local file name with full path
    * @throws CarbonDataWriterException
    */
-  private void copyCarbonDataFileToCarbonStorePath(String localFileName)
+  protected void copyCarbonDataFileToCarbonStorePath(String localFileName)
       throws CarbonDataWriterException {
     long copyStartTime = System.currentTimeMillis();
     LOGGER.info("Copying " + localFileName + " --> " + dataWriterVo.getCarbonDataDirectoryPath());
@@ -537,7 +555,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   @Override public void writeBlockletInfoToFile() throws CarbonDataWriterException {
     if (this.blockletInfoList.size() > 0) {
-      writeBlockletInfoToFile(this.blockletInfoList, fileChannel, fileName);
+      writeBlockletInfoToFile(fileChannel, fileName);
     }
   }
 
@@ -551,7 +569,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   public abstract void writeBlockletData(NodeHolder nodeHolder) throws CarbonDataWriterException;
 
-  protected byte[][] fillAndCompressedKeyBlockData(IndexStorage<int[]>[] keyStorageArray,
+  protected byte[][] fillAndCompressedKeyBlockData(IndexStorage[] keyStorageArray,
       int entryCount) {
     byte[][] keyBlockData = new byte[keyStorageArray.length][];
     int destPos = 0;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index ddcdfcd..94d2727 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.processing.store.writer;
 
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 public interface CarbonFactDataWriter<T> {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
deleted file mode 100644
index 3b90869..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/NodeHolder.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.writer;
-
-import java.util.BitSet;
-
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-
-public class NodeHolder {
-  /**
-   * keyArray
-   */
-  private byte[][] keyArray;
-
-  /**
-   * dataArray
-   */
-  private byte[][] dataArray;
-
-  /**
-   * measureLenght
-   */
-  private int[] measureLenght;
-
-  /**
-   * startKey
-   */
-  private byte[] startKey;
-
-  /**
-   * endKey
-   */
-  private byte[] endKey;
-
-  /**
-   * entryCount
-   */
-  private int entryCount;
-  /**
-   * keyLenghts
-   */
-  private int[] keyLengths;
-
-  /**
-   * dataAfterCompression
-   */
-  private short[][] dataAfterCompression;
-
-  /**
-   * indexMap
-   */
-  private short[][] indexMap;
-
-  /**
-   * keyIndexBlockLenght
-   */
-  private int[] keyBlockIndexLength;
-
-  /**
-   * isSortedKeyBlock
-   */
-  private boolean[] isSortedKeyBlock;
-
-  private byte[][] compressedIndex;
-
-  private byte[][] compressedIndexMap;
-
-  /**
-   * dataIndexMap
-   */
-  private int[] dataIndexMapLength;
-
-  /**
-   * dataIndexMap
-   */
-  private int[] dataIndexMapOffsets;
-
-  /**
-   * compressedDataIndex
-   */
-  private byte[][] compressedDataIndex;
-
-  /**
-   * column max data
-   */
-  private byte[][] columnMaxData;
-
-  /**
-   * column min data
-   */
-  private byte[][] columnMinData;
-
-  /**
-   * compression model for numbers data block.
-   */
-  private WriterCompressModel compressionModel;
-
-  /**
-   * array of aggBlocks flag to identify the aggBlocks
-   */
-  private boolean[] aggBlocks;
-
-  /**
-   * all columns max value
-   */
-  private byte[][] allMaxValue;
-
-  /**
-   * all column max value
-   */
-  private byte[][] allMinValue;
-
-  /**
-   * true if given index is colgroup block
-   */
-  private boolean[] colGrpBlock;
-
-  /**
-   * bit set which will holds the measure
-   * indexes which are null
-   */
-  private BitSet[] measureNullValueIndex;
-
-  /**
-   * total length of dimension values
-   */
-  private int totalDimensionArrayLength;
-
-  /**
-   * total length of all measure values
-   */
-  private int totalMeasureArrayLength;
-
-  /**
-   * @return the keyArray
-   */
-  public byte[][] getKeyArray() {
-    return keyArray;
-  }
-
-  /**
-   * @param keyArray the keyArray to set
-   */
-  public void setKeyArray(byte[][] keyArray) {
-    this.keyArray = keyArray;
-  }
-
-  /**
-   * @return the dataArray
-   */
-  public byte[][] getDataArray() {
-    return dataArray;
-  }
-
-  /**
-   * @param dataArray the dataArray to set
-   */
-  public void setDataArray(byte[][] dataArray) {
-    this.dataArray = dataArray;
-  }
-
-  /**
-   * @return the measureLenght
-   */
-  public int[] getMeasureLenght() {
-    return measureLenght;
-  }
-
-  /**
-   * @param measureLenght the measureLenght to set
-   */
-  public void setMeasureLenght(int[] measureLenght) {
-    this.measureLenght = measureLenght;
-  }
-
-  /**
-   * @return the startKey
-   */
-  public byte[] getStartKey() {
-    return startKey;
-  }
-
-  /**
-   * @param startKey the startKey to set
-   */
-  public void setStartKey(byte[] startKey) {
-    this.startKey = startKey;
-  }
-
-  /**
-   * @return the endKey
-   */
-  public byte[] getEndKey() {
-    return endKey;
-  }
-
-  /**
-   * @param endKey the endKey to set
-   */
-  public void setEndKey(byte[] endKey) {
-    this.endKey = endKey;
-  }
-
-  /**
-   * @return the entryCount
-   */
-  public int getEntryCount() {
-    return entryCount;
-  }
-
-  /**
-   * @param entryCount the entryCount to set
-   */
-  public void setEntryCount(int entryCount) {
-    this.entryCount = entryCount;
-  }
-
-  /**
-   * @return the keyLenghts
-   */
-  public int[] getKeyLengths() {
-    return keyLengths;
-  }
-
-  public void setKeyLengths(int[] keyLengths) {
-    this.keyLengths = keyLengths;
-  }
-
-  /**
-   * @return the keyBlockIndexLength
-   */
-  public int[] getKeyBlockIndexLength() {
-    return keyBlockIndexLength;
-  }
-
-  /**
-   * @param keyBlockIndexLength the keyBlockIndexLength to set
-   */
-  public void setKeyBlockIndexLength(int[] keyBlockIndexLength) {
-    this.keyBlockIndexLength = keyBlockIndexLength;
-  }
-
-  /**
-   * @return the isSortedKeyBlock
-   */
-  public boolean[] getIsSortedKeyBlock() {
-    return isSortedKeyBlock;
-  }
-
-  /**
-   * @param isSortedKeyBlock the isSortedKeyBlock to set
-   */
-  public void setIsSortedKeyBlock(boolean[] isSortedKeyBlock) {
-    this.isSortedKeyBlock = isSortedKeyBlock;
-  }
-
-  /**
-   * @return the compressedIndexex
-   */
-  public byte[][] getCompressedIndex() {
-    return compressedIndex;
-  }
-
-  public void setCompressedIndex(byte[][] compressedIndex) {
-    this.compressedIndex = compressedIndex;
-  }
-
-  /**
-   * @return the compressedIndexMap
-   */
-  public byte[][] getCompressedIndexMap() {
-    return compressedIndexMap;
-  }
-
-  /**
-   * @param compressedIndexMap the compressedIndexMap to set
-   */
-  public void setCompressedIndexMap(byte[][] compressedIndexMap) {
-    this.compressedIndexMap = compressedIndexMap;
-  }
-
-  /**
-   * @return the compressedDataIndex
-   */
-  public byte[][] getCompressedDataIndex() {
-    return compressedDataIndex;
-  }
-
-  /**
-   * @param compressedDataIndex the compressedDataIndex to set
-   */
-  public void setCompressedDataIndex(byte[][] compressedDataIndex) {
-    this.compressedDataIndex = compressedDataIndex;
-  }
-
-  /**
-   * @return the dataIndexMapLength
-   */
-  public int[] getDataIndexMapLength() {
-    return dataIndexMapLength;
-  }
-
-  /**
-   * @param dataIndexMapLength the dataIndexMapLength to set
-   */
-  public void setDataIndexMapLength(int[] dataIndexMapLength) {
-    this.dataIndexMapLength = dataIndexMapLength;
-  }
-
-  public byte[][] getColumnMaxData() {
-    return this.columnMaxData;
-  }
-
-  public void setColumnMaxData(byte[][] columnMaxData) {
-    this.columnMaxData = columnMaxData;
-  }
-
-  public byte[][] getColumnMinData() {
-    return this.columnMinData;
-  }
-
-  public void setColumnMinData(byte[][] columnMinData) {
-    this.columnMinData = columnMinData;
-  }
-
-  public WriterCompressModel getCompressionModel() {
-    return compressionModel;
-  }
-
-  public void setCompressionModel(WriterCompressModel compressionModel) {
-    this.compressionModel = compressionModel;
-  }
-
-  /**
-   * returns array of aggBlocks flag to identify the aag blocks
-   *
-   * @return
-   */
-  public boolean[] getAggBlocks() {
-    return aggBlocks;
-  }
-
-  /**
-   * set array of aggBlocks flag to identify the aggBlocks
-   *
-   * @param aggBlocks
-   */
-  public void setAggBlocks(boolean[] aggBlocks) {
-    this.aggBlocks = aggBlocks;
-  }
-
-  /**
-   * @return
-   */
-  public boolean[] getColGrpBlocks() {
-    return this.colGrpBlock;
-  }
-
-  /**
-   * @param colGrpBlock true if block is column group
-   */
-  public void setColGrpBlocks(boolean[] colGrpBlock) {
-    this.colGrpBlock = colGrpBlock;
-  }
-
-  /**
-   * @return the measureNullValueIndex
-   */
-  public BitSet[] getMeasureNullValueIndex() {
-    return measureNullValueIndex;
-  }
-
-  /**
-   * @param measureNullValueIndex the measureNullValueIndex to set
-   */
-  public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
-    this.measureNullValueIndex = measureNullValueIndex;
-  }
-
-  public int getTotalDimensionArrayLength() {
-    return totalDimensionArrayLength;
-  }
-
-  public void setTotalDimensionArrayLength(int totalDimensionArrayLength) {
-    this.totalDimensionArrayLength = totalDimensionArrayLength;
-  }
-
-  public int getTotalMeasureArrayLength() {
-    return totalMeasureArrayLength;
-  }
-
-  public void setTotalMeasureArrayLength(int totalMeasureArrayLength) {
-    this.totalMeasureArrayLength = totalMeasureArrayLength;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
index b7f8b4f..3218a51 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.store.writer.v1;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -29,12 +28,12 @@ import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.writer.CarbonFooterWriter;
 import org.apache.carbondata.format.FileFooter;
 import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
 import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
-import org.apache.carbondata.processing.store.writer.NodeHolder;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
@@ -363,15 +362,15 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
   /**
    * This method will write metadata at the end of file file format in thrift format
    */
-  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+  protected void writeBlockletInfoToFile(FileChannel channel,
       String filePath) throws CarbonDataWriterException {
     try {
       long currentPosition = channel.size();
       CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
       FileFooter convertFileMeta = CarbonMetadataUtil
-          .convertFileFooter(infoList, localCardinality.length, localCardinality,
+          .convertFileFooter(blockletInfoList, localCardinality.length, localCardinality,
               thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
-      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+      fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
       writer.writeFooter(convertFileMeta, currentPosition);
     } catch (IOException e) {
       throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
index 185d98d..a9c4ce0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -30,11 +30,11 @@ import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.writer.CarbonFooterWriter;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.FileFooter;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
-import org.apache.carbondata.processing.store.writer.NodeHolder;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
 
@@ -265,7 +265,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
   /**
    * This method will write metadata at the end of file file format in thrift format
    */
-  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+  protected void writeBlockletInfoToFile(FileChannel channel,
       String filePath) throws CarbonDataWriterException {
     try {
       // get the current file position
@@ -273,10 +273,10 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
       CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
       // get thrift file footer instance
       FileFooter convertFileMeta = CarbonMetadataUtil
-          .convertFilterFooter2(infoList, localCardinality, thriftColumnSchemaList,
+          .convertFilterFooter2(blockletInfoList, localCardinality, thriftColumnSchemaList,
               dataChunksOffsets, dataChunksLength);
       // fill the carbon index details
-      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+      fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
       // write the footer
       writer.writeFooter(convertFileMeta, currentPosition);
     } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
new file mode 100644
index 0000000..fa7bf27
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex;
+import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NodeHolder;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.BlockletInfo3;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+/**
+ * Below class will be used to write the data in V3 format
+ */
+public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]> {
+
+  /**
+   * number of pages in one column
+   */
+  private int numberOfChunksInBlocklet;
+
+  /**
+   * persist the page data to be written in the file
+   */
+  private DataWriterHolder dataWriterHolder;
+
+  public CarbonFactDataWriterImplV3(CarbonDataWriterVo dataWriterVo) {
+    super(dataWriterVo);
+    this.numberOfChunksInBlocklet = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+              CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE));
+    dataWriterHolder = new DataWriterHolder();
+  }
+
+  /**
+   * Below method will be used to build the node holder object
+   * This node holder object will be used to persist data which will
+   * be written in carbon data file
+   */
+  @Override public NodeHolder buildDataNodeHolder(IndexStorage<short[]>[] keyStorageArray,
+      byte[][] dataArray, int entryCount, byte[] startKey, byte[] endKey,
+      WriterCompressModel compressionModel, byte[] noDictionaryStartKey, byte[] noDictionaryEndKey)
+      throws CarbonDataWriterException {
+    // if there are no NO-Dictionary column present in the table then
+    // set the empty byte array
+    if (null == noDictionaryEndKey) {
+      noDictionaryEndKey = new byte[0];
+    }
+    if (null == noDictionaryStartKey) {
+      noDictionaryStartKey = new byte[0];
+    }
+    // total measure length;
+    int totalMsrArrySize = 0;
+    // current measure length;
+    int currentMsrLenght = 0;
+    int totalKeySize = 0;
+    int keyBlockSize = 0;
+
+    boolean[] isSortedData = new boolean[keyStorageArray.length];
+    int[] keyLengths = new int[keyStorageArray.length];
+
+    // below will calculate min and max value for each column
+    // for below 2d array, first index will be for column and second will be min
+    // max
+    // value for same column
+    // byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
+
+    byte[][] dimensionMinValue = new byte[keyStorageArray.length][];
+    byte[][] dimensionMaxValue = new byte[keyStorageArray.length][];
+
+    byte[][] measureMinValue = new byte[dataArray.length][];
+    byte[][] measureMaxValue = new byte[dataArray.length][];
+
+    byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, entryCount);
+    boolean[] colGrpBlock = new boolean[keyStorageArray.length];
+
+    for (int i = 0; i < keyLengths.length; i++) {
+      keyLengths[i] = keyBlockData[i].length;
+      isSortedData[i] = keyStorageArray[i].isAlreadySorted();
+      keyBlockSize++;
+      totalKeySize += keyLengths[i];
+      if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) {
+        dimensionMinValue[i] = keyStorageArray[i].getMin();
+        dimensionMaxValue[i] = keyStorageArray[i].getMax();
+      } else {
+        dimensionMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
+        dimensionMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
+      }
+      // if keyStorageArray is instance of ColGroupBlockStorage than it's
+      // colGroup chunk
+      if (keyStorageArray[i] instanceof ColGroupBlockStorage) {
+        colGrpBlock[i] = true;
+      }
+    }
+    for (int i = 0; i < dataArray.length; i++) {
+      measureMaxValue[i] = CarbonMetadataUtil
+          .getByteValueForMeasure(compressionModel.getMaxValue()[i],
+              dataWriterVo.getSegmentProperties().getMeasures().get(i).getDataType());
+      measureMinValue[i] = CarbonMetadataUtil
+          .getByteValueForMeasure(compressionModel.getMinValue()[i],
+              dataWriterVo.getSegmentProperties().getMeasures().get(i).getDataType());
+    }
+    int[] keyBlockIdxLengths = new int[keyBlockSize];
+    byte[][] dataAfterCompression = new byte[keyBlockSize][];
+    byte[][] indexMap = new byte[keyBlockSize][];
+    for (int i = 0; i < isSortedData.length; i++) {
+      if (!isSortedData[i]) {
+        dataAfterCompression[i] = getByteArray(keyStorageArray[i].getDataAfterComp());
+        if (null != keyStorageArray[i].getIndexMap()
+            && keyStorageArray[i].getIndexMap().length > 0) {
+          indexMap[i] = getByteArray(keyStorageArray[i].getIndexMap());
+        } else {
+          indexMap[i] = new byte[0];
+        }
+        keyBlockIdxLengths[i] = (dataAfterCompression[i].length + indexMap[i].length)
+            + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+      }
+    }
+    byte[][] compressedDataIndex = new byte[keyBlockSize][];
+    int[] dataIndexMapLength = new int[keyBlockSize];
+    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+      if (dataWriterVo.getAggBlocks()[i]) {
+        try {
+          compressedDataIndex[i] = getByteArray(keyStorageArray[i].getDataIndexMap());
+          dataIndexMapLength[i] = compressedDataIndex[i].length;
+        } catch (Exception e) {
+          throw new CarbonDataWriterException(e.getMessage());
+        }
+      }
+    }
+    int[] msrLength = new int[dataWriterVo.getMeasureCount()];
+    // calculate the total size required for all the measure and get the
+    // each measure size
+    for (int i = 0; i < dataArray.length; i++) {
+      currentMsrLenght = dataArray[i].length;
+      totalMsrArrySize += currentMsrLenght;
+      msrLength[i] = currentMsrLenght;
+    }
+    NodeHolder holder = new NodeHolder();
+    holder.setDataArray(dataArray);
+    holder.setKeyArray(keyBlockData);
+    // end key format will be <length of dictionary key><length of no
+    // dictionary key><DictionaryKey><No Dictionary key>
+    byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
+    ByteBuffer buffer = ByteBuffer.allocate(
+        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+            + endKey.length + updatedNoDictionaryEndKey.length);
+    buffer.putInt(endKey.length);
+    buffer.putInt(updatedNoDictionaryEndKey.length);
+    buffer.put(endKey);
+    buffer.put(updatedNoDictionaryEndKey);
+    buffer.rewind();
+    holder.setEndKey(buffer.array());
+    holder.setMeasureLenght(msrLength);
+    byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
+    // start key format will be <length of dictionary key><length of no
+    // dictionary key><DictionaryKey><No Dictionary key>
+    buffer = ByteBuffer.allocate(
+        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+            + startKey.length + updatedNoDictionaryStartKey.length);
+    buffer.putInt(startKey.length);
+    buffer.putInt(updatedNoDictionaryStartKey.length);
+    buffer.put(startKey);
+    buffer.put(updatedNoDictionaryStartKey);
+    buffer.rewind();
+    holder.setStartKey(buffer.array());
+    holder.setEntryCount(entryCount);
+    holder.setKeyLengths(keyLengths);
+    holder.setKeyBlockIndexLength(keyBlockIdxLengths);
+    holder.setIsSortedKeyBlock(isSortedData);
+    holder.setCompressedIndex(dataAfterCompression);
+    holder.setCompressedIndexMap(indexMap);
+    holder.setDataIndexMapLength(dataIndexMapLength);
+    holder.setCompressedDataIndex(compressedDataIndex);
+    holder.setCompressionModel(compressionModel);
+    holder.setTotalDimensionArrayLength(totalKeySize);
+    holder.setTotalMeasureArrayLength(totalMsrArrySize);
+    holder.setMeasureColumnMaxData(measureMaxValue);
+    holder.setMeasureColumnMinData(measureMinValue);
+    // setting column min max value
+    holder.setColumnMaxData(dimensionMaxValue);
+    holder.setColumnMinData(dimensionMinValue);
+    holder.setAggBlocks(dataWriterVo.getAggBlocks());
+    holder.setColGrpBlocks(colGrpBlock);
+    return holder;
+  }
+
+  /**
+   * Below method will be used to convert short array to byte array
+   *
+   * @param data in short data
+   * @return byte array
+   */
+  private byte[] getByteArray(short[] data) {
+    ByteBuffer buffer = ByteBuffer.allocate(data.length * 2);
+    for (short i = 0; i < data.length; i++) {
+      buffer.putShort(data[i]);
+    }
+    buffer.flip();
+    return buffer.array();
+  }
+
+  @Override protected void writeBlockletInfoToFile(FileChannel channel, String filePath)
+      throws CarbonDataWriterException {
+    try {
+      // get the current file position
+      long currentPosition = channel.size();
+      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+      // get thrift file footer instance
+      FileFooter convertFileMeta = CarbonMetadataUtil
+          .convertFileFooter3(blockletMetadata, blockletIndex, localCardinality,
+              thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+      // fill the carbon index details
+      fillBlockIndexInfoDetails(convertFileMeta.getNum_rows(), filePath, currentPosition);
+      // write the footer
+      writer.writeFooter(convertFileMeta, currentPosition);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
+    }
+  }
+
+  /**
+   * Below method will be used to write blocklet data to file
+   */
+  @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
+    // check the number of pages present in data holder, if pages is exceeding threshold
+    // it will write the pages to file
+    if (dataWriterHolder.getNumberOfPagesAdded() == numberOfChunksInBlocklet) {
+      writeDataToFile(fileChannel);
+    }
+    dataWriterHolder.addNodeHolder(holder);
+  }
+
+  private void writeDataToFile(FileChannel channel) {
+    // get the list of node holder list
+    List<NodeHolder> nodeHolderList = dataWriterHolder.getNodeHolder();
+    long blockletDataSize = 0;
+    // get data chunks for all the column
+    byte[][] dataChunkBytes =
+        new byte[nodeHolderList.get(0).getKeyArray().length + nodeHolderList.get(0)
+            .getDataArray().length][];
+    int measureStartIndex = nodeHolderList.get(0).getKeyArray().length;
+    // calculate the size of data chunks
+    try {
+      for (int i = 0; i < nodeHolderList.get(0).getKeyArray().length; i++) {
+        dataChunkBytes[i] = CarbonUtil.getByteArray(CarbonMetadataUtil
+            .getDataChunk3(nodeHolderList, thriftColumnSchemaList,
+                dataWriterVo.getSegmentProperties(), i, true));
+        blockletDataSize += dataChunkBytes[i].length;
+      }
+      for (int i = 0; i < nodeHolderList.get(0).getDataArray().length; i++) {
+        dataChunkBytes[measureStartIndex] = CarbonUtil.getByteArray(CarbonMetadataUtil
+            .getDataChunk3(nodeHolderList, thriftColumnSchemaList,
+                dataWriterVo.getSegmentProperties(), i, false));
+        blockletDataSize += dataChunkBytes[measureStartIndex].length;
+        measureStartIndex++;
+      }
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the data chunks", e);
+    }
+    // calculate the total size of data to be written
+    blockletDataSize += dataWriterHolder.getSize();
+    // to check if data size will exceed the block size then create a new file
+    updateBlockletFileChannel(blockletDataSize);
+    // write data to file
+    writeDataToFile(fileChannel, dataChunkBytes);
+    // clear the data holder
+    dataWriterHolder.clear();
+  }
+
+  /**
+   * Below method will be used to write data in carbon data file
+   * Data Format
+   * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+   * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+   * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+   * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+   * Each page will contain column data, Inverted index and rle index
+   *
+   * @param channel
+   * @param dataChunkBytes
+   */
+  private void writeDataToFile(FileChannel channel, byte[][] dataChunkBytes) {
+    long offset = 0;
+    // write the header
+    try {
+      if (fileChannel.size() == 0) {
+        ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+        byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + version).getBytes();
+        ByteBuffer buffer = ByteBuffer.allocate(header.length);
+        buffer.put(header);
+        buffer.rewind();
+        fileChannel.write(buffer);
+      }
+      offset = channel.size();
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the file channel size");
+    }
+    // to maintain the offset of each data chunk in blocklet
+    List<Long> currentDataChunksOffset = new ArrayList<>();
+    // to maintain the length of each data chunk in blocklet
+    List<Integer> currentDataChunksLength = new ArrayList<>();
+    // get the node holder list
+    List<NodeHolder> nodeHolderList = dataWriterHolder.getNodeHolder();
+    int numberOfDimension = nodeHolderList.get(0).getKeyArray().length;
+    int numberOfMeasures = nodeHolderList.get(0).getDataArray().length;
+    NodeHolder nodeHolder = null;
+    ByteBuffer buffer = null;
+    int bufferSize = 0;
+    long dimensionOffset = 0;
+    long measureOffset = 0;
+    int numberOfRows = 0;
+    // calculate the number of rows in each blocklet
+    for (int j = 0; j < nodeHolderList.size(); j++) {
+      numberOfRows += nodeHolderList.get(j).getEntryCount();
+    }
+    try {
+      for (int i = 0; i < numberOfDimension; i++) {
+        currentDataChunksOffset.add(offset);
+        currentDataChunksLength.add(dataChunkBytes[i].length);
+        buffer = ByteBuffer.allocate(dataChunkBytes[i].length);
+        buffer.put(dataChunkBytes[i]);
+        buffer.flip();
+        fileChannel.write(buffer);
+        offset += dataChunkBytes[i].length;
+        for (int j = 0; j < nodeHolderList.size(); j++) {
+          nodeHolder = nodeHolderList.get(j);
+          bufferSize = nodeHolder.getKeyLengths()[i] + (!nodeHolder.getIsSortedKeyBlock()[i] ?
+              nodeHolder.getKeyBlockIndexLength()[i] :
+              0) + (dataWriterVo.getAggBlocks()[i] ?
+              nodeHolder.getCompressedDataIndex()[i].length :
+              0);
+          buffer = ByteBuffer.allocate(bufferSize);
+          buffer.put(nodeHolder.getKeyArray()[i]);
+          if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+            buffer.putInt(nodeHolder.getCompressedIndex()[i].length);
+            buffer.put(nodeHolder.getCompressedIndex()[i]);
+            if (nodeHolder.getCompressedIndexMap()[i].length > 0) {
+              buffer.put(nodeHolder.getCompressedIndexMap()[i]);
+            }
+          }
+          if (nodeHolder.getAggBlocks()[i]) {
+            buffer.put(nodeHolder.getCompressedDataIndex()[i]);
+          }
+          buffer.flip();
+          fileChannel.write(buffer);
+          offset += bufferSize;
+        }
+      }
+      dimensionOffset = offset;
+      int dataChunkStartIndex = nodeHolderList.get(0).getKeyArray().length;
+      for (int i = 0; i < numberOfMeasures; i++) {
+        nodeHolderList = dataWriterHolder.getNodeHolder();
+        currentDataChunksOffset.add(offset);
+        currentDataChunksLength.add(dataChunkBytes[dataChunkStartIndex].length);
+        buffer = ByteBuffer.allocate(dataChunkBytes[dataChunkStartIndex].length);
+        buffer.put(dataChunkBytes[dataChunkStartIndex]);
+        buffer.flip();
+        fileChannel.write(buffer);
+        offset += dataChunkBytes[dataChunkStartIndex].length;
+        dataChunkStartIndex++;
+        for (int j = 0; j < nodeHolderList.size(); j++) {
+          nodeHolder = nodeHolderList.get(j);
+          bufferSize = nodeHolder.getDataArray()[i].length;
+          buffer = ByteBuffer.allocate(bufferSize);
+          buffer.put(nodeHolder.getDataArray()[i]);
+          buffer.flip();
+          fileChannel.write(buffer);
+          offset += bufferSize;
+        }
+      }
+      measureOffset = offset;
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the data", e);
+    }
+    blockletIndex.add(CarbonMetadataUtil
+        .getBlockletIndex(nodeHolderList, dataWriterVo.getSegmentProperties().getMeasures()));
+    BlockletInfo3 blockletInfo3 =
+        new BlockletInfo3(numberOfRows, currentDataChunksOffset, currentDataChunksLength,
+            dimensionOffset, measureOffset);
+    blockletMetadata.add(blockletInfo3);
+  }
+
+  /**
+   * Below method will be used to fill the block info details
+   *
+   * @param numberOfRows    number of rows in file
+   * @param filePath        file path
+   * @param currentPosition current offset
+   */
+  protected void fillBlockIndexInfoDetails(long numberOfRows, String filePath,
+      long currentPosition) {
+    byte[][] currentMinValue = new byte[blockletIndex.get(0).min_max_index.max_values.size()][];
+    byte[][] currentMaxValue = new byte[blockletIndex.get(0).min_max_index.max_values.size()][];
+    for (int i = 0; i < currentMaxValue.length; i++) {
+      currentMinValue[i] = blockletIndex.get(0).min_max_index.getMin_values().get(i).array();
+      currentMaxValue[i] = blockletIndex.get(0).min_max_index.getMax_values().get(i).array();
+    }
+    byte[] minValue = null;
+    byte[] maxValue = null;
+    int measureStartIndex = currentMinValue.length - dataWriterVo.getMeasureCount();
+    for (int i = 1; i < blockletIndex.size(); i++) {
+      for (int j = 0; j < measureStartIndex; j++) {
+        minValue = blockletIndex.get(i).min_max_index.getMin_values().get(j).array();
+        maxValue = blockletIndex.get(i).min_max_index.getMax_values().get(j).array();
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMinValue[j], minValue) > 0) {
+          currentMinValue[j] = minValue.clone();
+        }
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(currentMaxValue[j], maxValue) < 0) {
+          currentMaxValue[j] = maxValue.clone();
+        }
+      }
+      int measureIndex = 0;
+      for (int j = measureStartIndex; j < currentMinValue.length; j++) {
+        minValue = blockletIndex.get(i).min_max_index.getMin_values().get(j).array();
+        maxValue = blockletIndex.get(i).min_max_index.getMax_values().get(j).array();
+
+        if (CarbonMetadataUtil.compareMeasureData(currentMinValue[j], minValue,
+            dataWriterVo.getSegmentProperties().getMeasures().get(measureIndex).getDataType())
+            > 0) {
+          currentMinValue[j] = minValue.clone();
+        }
+        if (CarbonMetadataUtil.compareMeasureData(currentMaxValue[j], maxValue,
+            dataWriterVo.getSegmentProperties().getMeasures().get(measureIndex).getDataType())
+            < 0) {
+          currentMaxValue[j] = maxValue.clone();
+        }
+      }
+    }
+    BlockletBTreeIndex btree =
+        new BlockletBTreeIndex(blockletIndex.get(0).b_tree_index.getStart_key(),
+            blockletIndex.get(blockletIndex.size() - 1).b_tree_index.getEnd_key());
+    BlockletMinMaxIndex minmax = new BlockletMinMaxIndex();
+    minmax.setMinValues(currentMinValue);
+    minmax.setMaxValues(currentMaxValue);
+    org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex blockletIndex =
+        new org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex(btree, minmax);
+    BlockIndexInfo blockIndexInfo =
+        new BlockIndexInfo(numberOfRows, filePath.substring(0, filePath.lastIndexOf('.')),
+            currentPosition, blockletIndex);
+    blockIndexInfoList.add(blockIndexInfo);
+  }
+
+  /**
+   * Method will be used to close the open file channel
+   *
+   * @throws CarbonDataWriterException
+   */
+  public void closeWriter() throws CarbonDataWriterException {
+    if (dataWriterHolder.getNodeHolder().size() > 0) {
+      writeDataToFile(fileChannel);
+      writeBlockletInfoToFile(fileChannel, fileName);
+      CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel);
+      renameCarbonDataFile();
+      copyCarbonDataFileToCarbonStorePath(
+          this.fileName.substring(0, this.fileName.lastIndexOf('.')));
+      try {
+        writeIndexFile();
+      } catch (IOException e) {
+        throw new CarbonDataWriterException("Problem while writing the index file", e);
+      }
+    }
+    closeExecutorService();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
new file mode 100644
index 0000000..0827bd0
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/DataWriterHolder.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v3;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.util.NodeHolder;
+
+public class DataWriterHolder {
+  private List<NodeHolder> nodeHolder;
+  private long currentSize;
+
+  public DataWriterHolder() {
+    this.nodeHolder = new ArrayList<NodeHolder>();
+  }
+
+  public void clear() {
+    nodeHolder.clear();
+    currentSize = 0;
+  }
+
+  public void addNodeHolder(NodeHolder holder) {
+    this.nodeHolder.add(holder);
+
+    int size = 0;
+    // add row id index length
+    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+      if (!holder.getIsSortedKeyBlock()[i]) {
+        size += holder.getKeyBlockIndexLength()[i];
+      }
+    }
+    // add rle index length
+    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+      if (holder.getAggBlocks()[i]) {
+        size += holder.getDataIndexMapLength()[i];
+      }
+    }
+    currentSize +=
+        holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size;
+  }
+
+  public long getSize() {
+    return currentSize;
+  }
+
+  public int getNumberOfPagesAdded() {
+    return nodeHolder.size();
+  }
+
+  public List<NodeHolder> getNodeHolder() {
+    return nodeHolder;
+  }
+}


[4/4] incubator-carbondata git commit: [CARBONDATA-726] Added code for V3 format Writer and Reader This closes #609

Posted by ja...@apache.org.
[CARBONDATA-726] Added code for V3 format Writer and Reader This closes #609


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/740358c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/740358c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/740358c1

Branch: refs/heads/master
Commit: 740358c1344a03a2795c38f1c49548b6413a60e5
Parents: 922683e 2cf1104
Author: jackylk <ja...@huawei.com>
Authored: Sun Feb 26 22:52:27 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Feb 26 22:52:27 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  38 +-
 .../constants/CarbonV3DataFormatConstants.java  |  84 ++++
 .../datastore/chunk/AbstractRawColumnChunk.java |  11 +
 .../chunk/reader/CarbonDataReaderFactory.java   |  14 +-
 .../AbstractChunkReaderV2V3Format.java          | 126 +++++
 ...mpressedDimensionChunkFileBasedReaderV2.java | 127 ++---
 ...mpressedDimensionChunkFileBasedReaderV3.java | 268 ++++++++++
 .../AbstractMeasureChunkReaderV2V3Format.java   | 124 +++++
 ...CompressedMeasureChunkFileBasedReaderV2.java | 106 +---
 ...CompressedMeasureChunkFileBasedReaderV3.java | 239 +++++++++
 .../SafeFixedLengthDimensionDataChunkStore.java |   9 +-
 .../columnar/BlockIndexerStorageForShort.java   | 228 +++++++++
 .../columnar/ColumnWithShortIndex.java          |  76 +++
 .../ColumnWithShortIndexForNoDictionay.java     |  46 ++
 .../core/metadata/ColumnarFormatVersion.java    |   9 +-
 .../executor/impl/AbstractQueryExecutor.java    |  10 +-
 .../IncludeColGroupFilterExecuterImpl.java      |  24 +
 ...velRangeLessThanEqualFilterExecuterImpl.java |  14 +-
 .../RowLevelRangeLessThanFiterExecuterImpl.java |  14 +-
 .../carbondata/core/util/BitSetGroup.java       |   6 +-
 .../core/util/CarbonMetadataUtil.java           | 347 ++++++++++++-
 .../carbondata/core/util/CarbonProperties.java  | 110 +++-
 .../apache/carbondata/core/util/CarbonUtil.java | 110 ++++
 .../util/DataFileFooterConverterFactory.java    |   5 +-
 .../core/util/DataFileFooterConverterV3.java    | 141 ++++++
 .../apache/carbondata/core/util/NodeHolder.java | 430 ++++++++++++++++
 .../core/util/CarbonMetadataUtilTest.java       |   2 +-
 format/src/main/thrift/carbondata.thrift        |  27 +-
 .../store/CarbonDataWriterFactory.java          |   7 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  56 ++-
 .../store/writer/AbstractFactDataWriter.java    |  54 +-
 .../store/writer/CarbonFactDataWriter.java      |   1 +
 .../processing/store/writer/NodeHolder.java     | 410 ---------------
 .../writer/v1/CarbonFactDataWriterImplV1.java   |   9 +-
 .../writer/v2/CarbonFactDataWriterImplV2.java   |   8 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   | 499 +++++++++++++++++++
 .../store/writer/v3/DataWriterHolder.java       |  68 +++
 37 files changed, 3114 insertions(+), 743 deletions(-)
----------------------------------------------------------------------



[2/4] incubator-carbondata git commit: Added V3 Format Writer and Reader Code

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 0c2e8ab..1e8207c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.impl.StandardLogService;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.BlockIndexStore;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
@@ -60,6 +61,7 @@ import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
@@ -336,6 +338,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(updatedQueryDimension,
         segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions,
         queryProperties.complexFilterDimension, allProjectionListDimensionIdexes);
+    int numberOfColumnToBeReadInOneIO = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+            CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE));
+
     if (dimensionsBlockIndexes.length > 0) {
       numberOfElementToConsider = dimensionsBlockIndexes[dimensionsBlockIndexes.length - 1]
           == segmentProperties.getBlockTodimensionOrdinalMapping().size() - 1 ?
@@ -343,7 +349,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
           dimensionsBlockIndexes.length;
       blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(CarbonUtil
           .getRangeIndex(dimensionsBlockIndexes, numberOfElementToConsider,
-              CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO));
+              numberOfColumnToBeReadInOneIO));
     } else {
       blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[0][0]);
     }
@@ -362,7 +368,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
       // setting all the measure chunk indexes to be read from file
       blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(CarbonUtil
           .getRangeIndex(measureBlockIndexes, numberOfElementToConsider,
-              CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO));
+              numberOfColumnToBeReadInOneIO));
     } else {
       blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[0][0]);
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
index d2523a1..c64f498 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.scan.filter.executer;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
@@ -24,11 +25,14 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -80,6 +84,26 @@ public class IncludeColGroupFilterExecuterImpl extends IncludeFilterExecuterImpl
     return bitSet;
   }
 
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColumnEvaluatorInfo.getColumnIndex());
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    DimensionRawColumnChunk dimensionRawColumnChunk =
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+    BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
+    for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) {
+      if (dimensionRawColumnChunk.getMaxValues() != null) {
+        BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
+            dimensionRawColumnChunk.getRowCount()[i]);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+    }
+    return bitSetGroup;
+  }
+
   /**
    * It is required for extracting column data from columngroup chunk
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 2bdce8d..c7e2acc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -96,17 +96,9 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
     for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
       if (rawColumnChunk.getMinValues() != null) {
         if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
-          int compare = ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
-          if (compare >= 0) {
-            BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
-            bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
-            bitSetGroup.setBitSet(bitSet, i);
-          } else {
-            BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
-                rawColumnChunk.getRowCount()[i]);
-            bitSetGroup.setBitSet(bitSet, i);
-          }
+          BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+              rawColumnChunk.getRowCount()[i]);
+          bitSetGroup.setBitSet(bitSet, i);
         }
       } else {
         BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index ae9ba8a..d9795eb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -96,17 +96,9 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
     for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
       if (rawColumnChunk.getMinValues() != null) {
         if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
-          int compare = ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
-          if (compare > 0) {
-            BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
-            bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
-            bitSetGroup.setBitSet(bitSet, i);
-          } else {
-            BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
-                rawColumnChunk.getRowCount()[i]);
-            bitSetGroup.setBitSet(bitSet, i);
-          }
+          BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+              rawColumnChunk.getRowCount()[i]);
+          bitSetGroup.setBitSet(bitSet, i);
         }
       } else {
         BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
index 07e3487..323042a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
@@ -52,9 +52,11 @@ public class BitSetGroup {
   public void and(BitSetGroup group) {
     int i = 0;
     for (BitSet bitSet : bitSets) {
-      BitSet otherSet  = group.getBitSet(i);
+      BitSet otherSet = group.getBitSet(i);
       if (bitSet != null && otherSet != null) {
         bitSet.and(otherSet);
+      } else {
+        bitSets[i] = null;
       }
       i++;
     }
@@ -63,7 +65,7 @@ public class BitSetGroup {
   public void or(BitSetGroup group) {
     int i = 0;
     for (BitSet bitSet : bitSets) {
-      BitSet otherSet  = group.getBitSet(i);
+      BitSet otherSet = group.getBitSet(i);
       if (bitSet != null && otherSet != null) {
         bitSet.or(otherSet);
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 25e7cfe..55c0302 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -35,18 +36,22 @@ import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletBTreeIndex;
 import org.apache.carbondata.format.BlockletIndex;
 import org.apache.carbondata.format.BlockletInfo;
 import org.apache.carbondata.format.BlockletInfo2;
+import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.ChunkCompressionMeta;
 import org.apache.carbondata.format.ColumnSchema;
 import org.apache.carbondata.format.CompressionCodec;
 import org.apache.carbondata.format.DataChunk;
 import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
 import org.apache.carbondata.format.Encoding;
 import org.apache.carbondata.format.FileFooter;
 import org.apache.carbondata.format.IndexHeader;
@@ -109,6 +114,50 @@ public class CarbonMetadataUtil {
   }
 
   /**
+   * It converts list of BlockletInfoColumnar to FileFooter thrift objects
+   *
+   * @param infoList
+   * @param numCols
+   * @param cardinalities
+   * @return FileFooter
+   */
+  public static FileFooter convertFileFooter3(List<BlockletInfo3> infoList,
+      List<BlockletIndex> blockletIndexs, int[] cardinalities, List<ColumnSchema> columnSchemaList,
+      SegmentProperties segmentProperties) throws IOException {
+    FileFooter footer = getFileFooter3(infoList, blockletIndexs, cardinalities, columnSchemaList);
+    for (BlockletInfo3 info : infoList) {
+      footer.addToBlocklet_info_list3(info);
+    }
+    return footer;
+  }
+
+  /**
+   * Below method will be used to get the file footer object
+   *
+   * @param infoList         blocklet info
+   * @param cardinalities    cardinlaity of dimension columns
+   * @param columnSchemaList column schema list
+   * @return file footer
+   */
+  private static FileFooter getFileFooter3(List<BlockletInfo3> infoList,
+      List<BlockletIndex> blockletIndexs, int[] cardinalities,
+      List<ColumnSchema> columnSchemaList) {
+    SegmentInfo segmentInfo = new SegmentInfo();
+    segmentInfo.setNum_cols(columnSchemaList.size());
+    segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities));
+    ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+    FileFooter footer = new FileFooter();
+    footer.setVersion(version.number());
+    footer.setNum_rows(getNumberOfRowForFooter(infoList));
+    footer.setSegment_info(segmentInfo);
+    footer.setTable_columns(columnSchemaList);
+    for (BlockletIndex info : blockletIndexs) {
+      footer.addToBlocklet_index_list(info);
+    }
+    return footer;
+  }
+
+  /**
    * Below method will be used to get the file footer object for
    *
    * @param infoList         blocklet info
@@ -162,6 +211,20 @@ public class CarbonMetadataUtil {
     return numberOfRows;
   }
 
+  /**
+   * Get total number of rows for the file.
+   *
+   * @param infoList
+   * @return
+   */
+  private static long getNumberOfRowForFooter(List<BlockletInfo3> infoList) {
+    long numberOfRows = 0;
+    for (BlockletInfo3 info : infoList) {
+      numberOfRows += info.num_rows;
+    }
+    return numberOfRows;
+  }
+
   private static BlockletIndex getBlockletIndex(BlockletInfoColumnar info) {
 
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
@@ -181,9 +244,52 @@ public class CarbonMetadataUtil {
     return blockletIndex;
   }
 
+  public static BlockletIndex getBlockletIndex(List<NodeHolder> nodeHolderList,
+      List<CarbonMeasure> carbonMeasureList) {
+    BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
+    for (byte[] max : nodeHolderList.get(nodeHolderList.size() - 1).getColumnMaxData()) {
+      blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max));
+    }
+    for (byte[] min : nodeHolderList.get(0).getColumnMinData()) {
+      blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
+    }
+    byte[][] measureMaxValue = nodeHolderList.get(0).getMeasureColumnMaxData().clone();
+    byte[][] measureMinValue = nodeHolderList.get(0).getMeasureColumnMinData().clone();
+    byte[] minVal = null;
+    byte[] maxVal = null;
+    for (int i = 1; i < nodeHolderList.size(); i++) {
+      for (int j = 0; j < measureMinValue.length; j++) {
+        minVal = nodeHolderList.get(i).getMeasureColumnMinData()[j];
+        maxVal = nodeHolderList.get(i).getMeasureColumnMaxData()[j];
+        if (compareMeasureData(measureMaxValue[j], maxVal, carbonMeasureList.get(j).getDataType())
+            < 0) {
+          measureMaxValue[j] = maxVal.clone();
+        }
+        if (compareMeasureData(measureMinValue[j], minVal, carbonMeasureList.get(j).getDataType())
+            > 0) {
+          measureMinValue[j] = minVal.clone();
+        }
+      }
+    }
+
+    for (byte[] max : measureMaxValue) {
+      blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max));
+    }
+    for (byte[] min : measureMinValue) {
+      blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
+    }
+    BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
+    blockletBTreeIndex.setStart_key(nodeHolderList.get(0).getStartKey());
+    blockletBTreeIndex.setEnd_key(nodeHolderList.get(nodeHolderList.size() - 1).getEndKey());
+    BlockletIndex blockletIndex = new BlockletIndex();
+    blockletIndex.setMin_max_index(blockletMinMaxIndex);
+    blockletIndex.setB_tree_index(blockletBTreeIndex);
+    return blockletIndex;
+  }
+
   /**
-   * Below method will be used to get the blocklet info object for
-   * data version 2 file
+   * Below method will be used to get the blocklet info object for data version
+   * 2 file
    *
    * @param blockletInfoColumnar blocklet info
    * @param dataChunkOffsets     data chunks offsets
@@ -222,7 +328,8 @@ public class CarbonMetadataUtil {
         encodings.add(Encoding.DIRECT_DICTIONARY);
       }
       dataChunk.setRowMajor(colGrpblock[i]);
-      //TODO : Once schema PR is merged and information needs to be passed here.
+      // TODO : Once schema PR is merged and information needs to be passed
+      // here.
       dataChunk.setColumn_ids(new ArrayList<Integer>());
       dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
       dataChunk.setData_page_offset(blockletInfoColumnar.getKeyOffSets()[i]);
@@ -242,7 +349,8 @@ public class CarbonMetadataUtil {
         j++;
       }
 
-      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      // TODO : Right now the encodings are happening at runtime. change as per
+      // this encoders.
       dataChunk.setEncoders(encodings);
 
       colDataChunks.add(dataChunk);
@@ -252,24 +360,26 @@ public class CarbonMetadataUtil {
       DataChunk dataChunk = new DataChunk();
       dataChunk.setChunk_meta(getChunkCompressionMeta());
       dataChunk.setRowMajor(false);
-      //TODO : Once schema PR is merged and information needs to be passed here.
+      // TODO : Once schema PR is merged and information needs to be passed
+      // here.
       dataChunk.setColumn_ids(new ArrayList<Integer>());
       dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
       dataChunk.setData_page_offset(blockletInfoColumnar.getMeasureOffset()[i]);
-      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      // TODO : Right now the encodings are happening at runtime. change as per
+      // this encoders.
       List<Encoding> encodings = new ArrayList<Encoding>();
       encodings.add(Encoding.DELTA);
       dataChunk.setEncoders(encodings);
-      //TODO writing dummy presence meta need to set actual presence
-      //meta
+      // TODO writing dummy presence meta need to set actual presence
+      // meta
       PresenceMeta presenceMeta = new PresenceMeta();
       presenceMeta.setPresent_bit_streamIsSet(true);
       presenceMeta
           .setPresent_bit_stream(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray());
       dataChunk.setPresence(presenceMeta);
-      //TODO : PresenceMeta needs to be implemented and set here
+      // TODO : PresenceMeta needs to be implemented and set here
       // dataChunk.setPresence(new PresenceMeta());
-      //TODO : Need to write ValueCompression meta here.
+      // TODO : Need to write ValueCompression meta here.
       List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
       encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
           createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
@@ -291,7 +401,7 @@ public class CarbonMetadataUtil {
   private static boolean containsEncoding(int blockIndex, Encoding encoding,
       List<ColumnSchema> columnSchemas, SegmentProperties segmentProperties) {
     Set<Integer> dimOrdinals = segmentProperties.getDimensionOrdinalForBlock(blockIndex);
-    //column groups will always have dictionary encoding
+    // column groups will always have dictionary encoding
     if (dimOrdinals.size() > 1 && Encoding.DICTIONARY == encoding) {
       return true;
     }
@@ -336,7 +446,8 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * It converts FileFooter thrift object to list of BlockletInfoColumnar objects
+   * It converts FileFooter thrift object to list of BlockletInfoColumnar
+   * objects
    *
    * @param footer
    * @return
@@ -486,8 +597,8 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * Below method will be used to get the block index info thrift object for each block
-   * present in the segment
+   * Below method will be used to get the block index info thrift object for
+   * each block present in the segment
    *
    * @param blockIndexInfoList block index info list
    * @return list of block index
@@ -508,8 +619,7 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * Below method will be used to get the data chunk object for all the
-   * columns
+   * Below method will be used to get the data chunk object for all the columns
    *
    * @param blockletInfoColumnar blocklet info
    * @param columnSchenma        list of columns
@@ -536,7 +646,8 @@ public class CarbonMetadataUtil {
         encodings.add(Encoding.DIRECT_DICTIONARY);
       }
       dataChunk.setRowMajor(colGrpblock[i]);
-      //TODO : Once schema PR is merged and information needs to be passed here.
+      // TODO : Once schema PR is merged and information needs to be passed
+      // here.
       dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
       if (aggKeyBlock[i]) {
         dataChunk.setRle_page_length(blockletInfoColumnar.getDataIndexMapLength()[aggregateIndex]);
@@ -552,7 +663,8 @@ public class CarbonMetadataUtil {
         rowIdIndex++;
       }
 
-      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      // TODO : Right now the encodings are happening at runtime. change as per
+      // this encoders.
       dataChunk.setEncoders(encodings);
 
       colDataChunks.add(dataChunk);
@@ -562,22 +674,24 @@ public class CarbonMetadataUtil {
       DataChunk2 dataChunk = new DataChunk2();
       dataChunk.setChunk_meta(getChunkCompressionMeta());
       dataChunk.setRowMajor(false);
-      //TODO : Once schema PR is merged and information needs to be passed here.
+      // TODO : Once schema PR is merged and information needs to be passed
+      // here.
       dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
-      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      // TODO : Right now the encodings are happening at runtime. change as per
+      // this encoders.
       List<Encoding> encodings = new ArrayList<Encoding>();
       encodings.add(Encoding.DELTA);
       dataChunk.setEncoders(encodings);
-      //TODO writing dummy presence meta need to set actual presence
-      //meta
+      // TODO writing dummy presence meta need to set actual presence
+      // meta
       PresenceMeta presenceMeta = new PresenceMeta();
       presenceMeta.setPresent_bit_streamIsSet(true);
       presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
           .compressByte(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray()));
       dataChunk.setPresence(presenceMeta);
-      //TODO : PresenceMeta needs to be implemented and set here
+      // TODO : PresenceMeta needs to be implemented and set here
       // dataChunk.setPresence(new PresenceMeta());
-      //TODO : Need to write ValueCompression meta here.
+      // TODO : Need to write ValueCompression meta here.
       List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
       encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
           createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
@@ -586,4 +700,189 @@ public class CarbonMetadataUtil {
     }
     return colDataChunks;
   }
+
+  /**
+   * Below method will be used to get the data chunk object for all the columns
+   *
+   * @param blockletInfoColumnar blocklet info
+   * @param columnSchenma        list of columns
+   * @param segmentProperties    segment properties
+   * @return list of data chunks
+   * @throws IOException
+   */
+  private static List<DataChunk2> getDatachunk2(List<NodeHolder> nodeHolderList,
+      List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties, int index,
+      boolean isDimensionColumn) throws IOException {
+    List<DataChunk2> colDataChunks = new ArrayList<DataChunk2>();
+    DataChunk2 dataChunk = null;
+    NodeHolder nodeHolder = null;
+    for (int i = 0; i < nodeHolderList.size(); i++) {
+      nodeHolder = nodeHolderList.get(i);
+      dataChunk = new DataChunk2();
+      dataChunk.min_max = new BlockletMinMaxIndex();
+      dataChunk.setChunk_meta(getChunkCompressionMeta());
+      dataChunk.setNumberOfRowsInpage(nodeHolder.getEntryCount());
+      List<Encoding> encodings = new ArrayList<Encoding>();
+      if (isDimensionColumn) {
+        dataChunk.setData_page_length(nodeHolder.getKeyLengths()[index]);
+        if (containsEncoding(index, Encoding.DICTIONARY, columnSchenma, segmentProperties)) {
+          encodings.add(Encoding.DICTIONARY);
+        }
+        if (containsEncoding(index, Encoding.DIRECT_DICTIONARY, columnSchenma, segmentProperties)) {
+          encodings.add(Encoding.DIRECT_DICTIONARY);
+        }
+        dataChunk.setRowMajor(nodeHolder.getColGrpBlocks()[index]);
+        // TODO : Once schema PR is merged and information needs to be passed
+        // here.
+        if (nodeHolder.getAggBlocks()[index]) {
+          dataChunk.setRle_page_length(nodeHolder.getDataIndexMapLength()[index]);
+          encodings.add(Encoding.RLE);
+        }
+        dataChunk.setSort_state(nodeHolder.getIsSortedKeyBlock()[index] ?
+            SortState.SORT_EXPLICIT :
+            SortState.SORT_NATIVE);
+
+        if (!nodeHolder.getIsSortedKeyBlock()[index]) {
+          dataChunk.setRowid_page_length(nodeHolder.getKeyBlockIndexLength()[index]);
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
+        dataChunk.min_max.addToMax_values(ByteBuffer.wrap(nodeHolder.getColumnMaxData()[index]));
+        dataChunk.min_max.addToMin_values(ByteBuffer.wrap(nodeHolder.getColumnMinData()[index]));
+      } else {
+        dataChunk.setData_page_length(nodeHolder.getDataArray()[index].length);
+        // TODO : Right now the encodings are happening at runtime. change as
+        // per this encoders.
+        dataChunk.setEncoders(encodings);
+
+        dataChunk.setRowMajor(false);
+        // TODO : Right now the encodings are happening at runtime. change as
+        // per this encoders.
+        encodings.add(Encoding.DELTA);
+        dataChunk.setEncoders(encodings);
+        // TODO writing dummy presence meta need to set actual presence
+        // meta
+        PresenceMeta presenceMeta = new PresenceMeta();
+        presenceMeta.setPresent_bit_streamIsSet(true);
+        presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
+            .compressByte(nodeHolder.getMeasureNullValueIndex()[index].toByteArray()));
+        dataChunk.setPresence(presenceMeta);
+        List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
+        encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer(
+            createValueEncoderMeta(nodeHolder.getCompressionModel(), index))));
+        dataChunk.setEncoder_meta(encoderMetaList);
+        dataChunk.min_max
+            .addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[index]));
+        dataChunk.min_max
+            .addToMin_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMinData()[index]));
+      }
+      dataChunk.setEncoders(encodings);
+      colDataChunks.add(dataChunk);
+    }
+    return colDataChunks;
+  }
+
+  public static DataChunk3 getDataChunk3(List<NodeHolder> nodeHolderList,
+      List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties, int index,
+      boolean isDimensionColumn) throws IOException {
+    List<DataChunk2> dataChunksList =
+        getDatachunk2(nodeHolderList, columnSchenma, segmentProperties, index, isDimensionColumn);
+    int offset = 0;
+    DataChunk3 dataChunk = new DataChunk3();
+    List<Integer> pageOffsets = new ArrayList<>();
+    List<Integer> pageLengths = new ArrayList<>();
+    int length = 0;
+    for (int i = 0; i < dataChunksList.size(); i++) {
+      pageOffsets.add(offset);
+      length =
+          dataChunksList.get(i).getData_page_length() + dataChunksList.get(i).getRle_page_length()
+              + dataChunksList.get(i).getRowid_page_length();
+      pageLengths.add(length);
+      offset += length;
+    }
+    dataChunk.setData_chunk_list(dataChunksList);
+    dataChunk.setPage_length(pageLengths);
+    dataChunk.setPage_offset(pageOffsets);
+    return dataChunk;
+  }
+
+  public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
+    ByteBuffer buffer = null;
+    if (valueEncoderMeta.getType() == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+      buffer = ByteBuffer.allocate(
+          (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+              + 3);
+      buffer.putChar(valueEncoderMeta.getType());
+      buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
+      buffer.putDouble((Double) valueEncoderMeta.getMinValue());
+      buffer.putDouble((Double) valueEncoderMeta.getUniqueValue());
+    } else if (valueEncoderMeta.getType() == CarbonCommonConstants.BIG_INT_MEASURE) {
+      buffer = ByteBuffer.allocate(
+          (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+              + 3);
+      buffer.putChar(valueEncoderMeta.getType());
+      buffer.putLong((Long) valueEncoderMeta.getMaxValue());
+      buffer.putLong((Long) valueEncoderMeta.getMinValue());
+      buffer.putLong((Long) valueEncoderMeta.getUniqueValue());
+    } else {
+      buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
+      buffer.putChar(valueEncoderMeta.getType());
+    }
+    buffer.putInt(valueEncoderMeta.getDecimal());
+    buffer.put(valueEncoderMeta.getDataTypeSelected());
+    buffer.flip();
+    return buffer.array();
+  }
+
+  public static byte[] getByteValueForMeasure(Object data, DataType dataType) {
+    ByteBuffer b = null;
+    switch (dataType) {
+      case DOUBLE:
+        b = ByteBuffer.allocate(8);
+        b.putDouble((Double) data);
+        b.flip();
+        return b.array();
+      case LONG:
+      case INT:
+      case SHORT:
+        b = ByteBuffer.allocate(8);
+        b.putLong((Long) data);
+        b.flip();
+        return b.array();
+      case DECIMAL:
+        return DataTypeUtil.bigDecimalToByte((BigDecimal)data);
+      default:
+        throw new IllegalArgumentException("Invalid data type");
+    }
+  }
+
+  public static int compareMeasureData(byte[] first, byte[] second, DataType dataType) {
+    ByteBuffer firstBuffer = null;
+    ByteBuffer secondBuffer = null;
+    switch (dataType) {
+      case DOUBLE:
+        firstBuffer = ByteBuffer.allocate(8);
+        firstBuffer.put(first);
+        secondBuffer = ByteBuffer.allocate(8);
+        secondBuffer.put(first);
+        firstBuffer.flip();
+        secondBuffer.flip();
+        return (int) (firstBuffer.getDouble() - secondBuffer.getDouble());
+      case LONG:
+      case INT:
+      case SHORT:
+        firstBuffer = ByteBuffer.allocate(8);
+        firstBuffer.put(first);
+        secondBuffer = ByteBuffer.allocate(8);
+        secondBuffer.put(first);
+        firstBuffer.flip();
+        secondBuffer.flip();
+        return (int) (firstBuffer.getLong() - secondBuffer.getLong());
+      case DECIMAL:
+        return DataTypeUtil.byteToBigDecimal(first)
+            .compareTo(DataTypeUtil.byteToBigDecimal(second));
+      default:
+        throw new IllegalArgumentException("Invalid data type");
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 962d352..39c36ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
 public final class CarbonProperties {
@@ -85,6 +86,9 @@ public final class CarbonProperties {
     validateCarbonDataFileVersion();
     validateExecutorStartUpTime();
     validatePrefetchBufferSize();
+    validateNumberOfPagesPerBlocklet();
+    validateNumberOfColumnPerIORead();
+    validateNumberOfRowsPerBlockletColumnPage();
   }
 
   private void validatePrefetchBufferSize() {
@@ -107,6 +111,93 @@ public final class CarbonProperties {
     }
   }
 
+  /**
+   * This method validates the number of pages per blocklet column
+   */
+  private void validateNumberOfPagesPerBlocklet() {
+    String numberOfPagePerBlockletColumnString = carbonProperties
+        .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+            CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+    try {
+      short numberOfPagePerBlockletColumn = Short.parseShort(numberOfPagePerBlockletColumnString);
+      if (numberOfPagePerBlockletColumn
+          < CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MIN
+          || numberOfPagePerBlockletColumn
+          > CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MAX) {
+        LOGGER.info(
+            "The Number Of pages per blocklet column value \"" + numberOfPagePerBlockletColumnString
+                + "\" is invalid. Using the default value \""
+                + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+        carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+            CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info(
+          "The Number Of pages per blocklet column value \"" + numberOfPagePerBlockletColumnString
+              + "\" is invalid. Using the default value \""
+              + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+      carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+          CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+    }
+  }
+
+  /**
+   * This method validates the number of column read in one IO
+   */
+  private void validateNumberOfColumnPerIORead() {
+    String numberofColumnPerIOString = carbonProperties
+        .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+            CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+    try {
+      short numberofColumnPerIO = Short.parseShort(numberofColumnPerIOString);
+      if (numberofColumnPerIO < CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_MIN
+          || numberofColumnPerIO > CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_MAX) {
+        LOGGER.info("The Number Of pages per blocklet column value \"" + numberofColumnPerIOString
+            + "\" is invalid. Using the default value \""
+            + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+        carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+            CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The Number Of pages per blocklet column value \"" + numberofColumnPerIOString
+          + "\" is invalid. Using the default value \""
+          + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+      carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+          CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+    }
+  }
+
+  /**
+   * This method validates the number of column read in one IO
+   */
+  private void validateNumberOfRowsPerBlockletColumnPage() {
+    String numberOfRowsPerBlockletColumnPageString = carbonProperties
+        .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+            CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+    try {
+      short numberOfRowsPerBlockletColumnPage =
+          Short.parseShort(numberOfRowsPerBlockletColumnPageString);
+      if (numberOfRowsPerBlockletColumnPage
+          < CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MIN
+          || numberOfRowsPerBlockletColumnPage
+          > CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MAX) {
+        LOGGER.info("The Number Of rows per blocklet column pages value \""
+            + numberOfRowsPerBlockletColumnPageString + "\" is invalid. Using the default value \""
+            + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+        carbonProperties
+            .setProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+                CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The Number Of rows per blocklet column pages value \""
+          + numberOfRowsPerBlockletColumnPageString + "\" is invalid. Using the default value \""
+          + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+      carbonProperties
+          .setProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+              CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+    }
+  }
+
   private void validateBadRecordsLocation() {
     String badRecordsLocation =
         carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
@@ -288,17 +379,16 @@ public final class CarbonProperties {
         carbonProperties.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
     if (carbondataFileVersionString == null) {
       // use default property if user does not specify version property
-      carbonProperties
-          .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
-              CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+      carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+          CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
     } else {
       try {
         ColumnarFormatVersion.valueOf(carbondataFileVersionString);
       } catch (IllegalArgumentException e) {
         // use default property if user specifies an invalid version property
-        LOGGER.warn("Specified file version property is invalid: " +
-            carbondataFileVersionString + ". Using " +
-            CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION + " as default file version");
+        LOGGER.warn("Specified file version property is invalid: " + carbondataFileVersionString
+            + ". Using " + CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION
+            + " as default file version");
         carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
             CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
       }
@@ -569,6 +659,7 @@ public final class CarbonProperties {
 
   /**
    * Returns configured update deleta files value for IUD compaction
+   *
    * @return numberOfDeltaFilesThreshold
    */
   public int getNoUpdateDeltaFilesThresholdForIUDCompaction() {
@@ -588,8 +679,7 @@ public final class CarbonProperties {
       }
     } catch (NumberFormatException e) {
       LOGGER.error("The specified value for property "
-          + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
-          + "is incorrect."
+          + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
           + " Correct value should be in range of 0 -10000. Taking the default value.");
       numberOfDeltaFilesThreshold = Integer
           .parseInt(CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
@@ -599,6 +689,7 @@ public final class CarbonProperties {
 
   /**
    * Returns configured delete deleta files value for IUD compaction
+   *
    * @return numberOfDeltaFilesThreshold
    */
   public int getNoDeleteDeltaFilesThresholdForIUDCompaction() {
@@ -618,8 +709,7 @@ public final class CarbonProperties {
       }
     } catch (NumberFormatException e) {
       LOGGER.error("The specified value for property "
-          + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
-          + "is incorrect."
+          + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
           + " Correct value should be in range of 0 -10000. Taking the default value.");
       numberOfDeltaFilesThreshold = Integer
           .parseInt(CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b9a96d2..5a656e0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -73,6 +73,7 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -471,6 +472,28 @@ public final class CarbonUtil {
             numberCompressor.unCompress(indexMap, 0, indexMap.length));
   }
 
+  public static int[] getUnCompressColumnIndex(int totalLength, ByteBuffer buffer, int offset) {
+    buffer.position(offset);
+    int indexDataLength = buffer.getInt();
+    int indexMapLength = totalLength - indexDataLength - CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    int[] indexData = getIntArray(buffer, buffer.position(), indexDataLength);
+    int[] indexMap = getIntArray(buffer, buffer.position(), indexMapLength);
+    return UnBlockIndexer.uncompressIndex(indexData, indexMap);
+  }
+
+  public static int[] getIntArray(ByteBuffer data, int offset, int length) {
+    if (length == 0) {
+      return new int[0];
+    }
+    data.position(offset);
+    int[] intArray = new int[length / 2];
+    int index = 0;
+    while (index < intArray.length) {
+      intArray[index++] = data.getShort();
+    }
+    return intArray;
+  }
+
   /**
    * Convert int array to Integer list
    *
@@ -1233,6 +1256,18 @@ public final class CarbonUtil {
     }, offset, length);
   }
 
+  public static DataChunk3 readDataChunk3(ByteBuffer dataChunkBuffer, int offset, int length)
+      throws IOException {
+    byte[] data = new byte[length];
+    dataChunkBuffer.position(offset);
+    dataChunkBuffer.get(data);
+    return (DataChunk3) read(data, new ThriftReader.TBaseCreator() {
+      @Override public TBase create() {
+        return new DataChunk3();
+      }
+    }, 0, length);
+  }
+
   public static DataChunk2 readDataChunk(ByteBuffer dataChunkBuffer, int offset, int length)
       throws IOException {
     byte[] data = new byte[length];
@@ -1293,6 +1328,35 @@ public final class CarbonUtil {
     return meta;
   }
 
+  public static ValueEncoderMeta deserializeEncoderMetaNew(byte[] encodeMeta) {
+    ByteBuffer buffer = ByteBuffer.wrap(encodeMeta);
+    char measureType = buffer.getChar();
+    ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta();
+    valueEncoderMeta.setType(measureType);
+    switch (measureType) {
+      case CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE:
+        valueEncoderMeta.setMaxValue(buffer.getDouble());
+        valueEncoderMeta.setMinValue(buffer.getDouble());
+        valueEncoderMeta.setUniqueValue(buffer.getDouble());
+        break;
+      case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+        valueEncoderMeta.setMaxValue(0.0);
+        valueEncoderMeta.setMinValue(0.0);
+        valueEncoderMeta.setUniqueValue(0.0);
+        break;
+      case CarbonCommonConstants.BIG_INT_MEASURE:
+        valueEncoderMeta.setMaxValue(buffer.getLong());
+        valueEncoderMeta.setMinValue(buffer.getLong());
+        valueEncoderMeta.setUniqueValue(buffer.getLong());
+        break;
+      default:
+        throw new IllegalArgumentException("invalid measure type");
+    }
+    valueEncoderMeta.setDecimal(buffer.getInt());
+    valueEncoderMeta.setDataTypeSelected(buffer.get());
+    return valueEncoderMeta;
+  }
+
   /**
    * Below method will be used to convert indexes in range
    * Indexes=[0,1,2,3,4,5,6,7,8,9]
@@ -1454,5 +1518,51 @@ public final class CarbonUtil {
         return null;
     }
   }
+
+  /**
+   * Below method will be used to convert byte data to surrogate key based
+   * column value size
+   *
+   * @param data                data
+   * @param startOffsetOfData   start offset of data
+   * @param eachColumnValueSize size of each column value
+   * @return surrogate key
+   */
+  public static int getSurrogateInternal(byte[] data, int startOffsetOfData,
+      int eachColumnValueSize) {
+    int surrogate = 0;
+    switch (eachColumnValueSize) {
+      case 1:
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData] & 0xFF;
+        return surrogate;
+      case 2:
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+        return surrogate;
+      case 3:
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 2] & 0xFF;
+        return surrogate;
+      case 4:
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 2] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 3] & 0xFF;
+        return surrogate;
+      default:
+        throw new IllegalArgumentException("Int cannot me more than 4 bytes");
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
index 08bfd6d..153fcb9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
@@ -56,8 +56,11 @@ public class DataFileFooterConverterFactory {
     switch (version) {
       case V1:
         return new DataFileFooterConverter();
-      default:
+      case V2:
         return new DataFileFooterConverter2();
+      case V3:
+      default:
+        return new DataFileFooterConverterV3();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
new file mode 100644
index 0000000..1ab3133
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonFooterReader;
+import org.apache.carbondata.format.FileFooter;
+
+public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
+
+  /**
+   * Below method will be used to convert thrift file meta to wrapper file meta
+   * This method will read the footer from footer offset present in the data file
+   * 1. It will set the stream offset
+   * 2. It will read the footer data from file
+   * 3. parse the footer to thrift object
+   * 4. convert to wrapper object
+   *
+   * @param tableBlockInfo
+   *        table block info
+   * @return data file footer
+   */
+  @Override public DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
+      throws IOException {
+    DataFileFooter dataFileFooter = new DataFileFooter();
+    CarbonFooterReader reader =
+        new CarbonFooterReader(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset());
+    FileFooter footer = reader.readFooter();
+    dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) footer.getVersion()));
+    dataFileFooter.setNumberOfRows(footer.getNum_rows());
+    dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
+    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+    List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns();
+    for (int i = 0; i < table_columns.size(); i++) {
+      columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+    }
+    dataFileFooter.setColumnInTable(columnSchemaList);
+
+    List<org.apache.carbondata.format.BlockletIndex> leaf_node_indices_Thrift =
+        footer.getBlocklet_index_list();
+    List<BlockletIndex> blockletIndexList = new ArrayList<BlockletIndex>();
+    for (int i = 0; i < leaf_node_indices_Thrift.size(); i++) {
+      BlockletIndex blockletIndex = getBlockletIndex(leaf_node_indices_Thrift.get(i));
+      blockletIndexList.add(blockletIndex);
+    }
+    List<org.apache.carbondata.format.BlockletInfo3> leaf_node_infos_Thrift =
+        footer.getBlocklet_info_list3();
+    List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
+    for (int i = 0; i < leaf_node_infos_Thrift.size(); i++) {
+      BlockletInfo blockletInfo = getBlockletInfo(leaf_node_infos_Thrift.get(i),
+          getNumberOfDimensionColumns(columnSchemaList));
+      blockletInfo.setBlockletIndex(blockletIndexList.get(i));
+      blockletInfoList.add(blockletInfo);
+    }
+    dataFileFooter.setBlockletList(blockletInfoList);
+    dataFileFooter.setBlockletIndex(getBlockletIndexForDataFileFooter(blockletIndexList));
+    return dataFileFooter;
+  }
+
+  /**
+   * Below method is to convert the blocklet info of the thrift to wrapper
+   * blocklet info
+   *
+   * @param blockletInfoThrift blocklet info of the thrift
+   * @return blocklet info wrapper
+   */
+  private BlockletInfo getBlockletInfo(
+      org.apache.carbondata.format.BlockletInfo3 blockletInfoThrift, int numberOfDimensionColumns) {
+    BlockletInfo blockletInfo = new BlockletInfo();
+    List<Long> dimensionColumnChunkOffsets =
+        blockletInfoThrift.getColumn_data_chunks_offsets().subList(0, numberOfDimensionColumns);
+    List<Long> measureColumnChunksOffsets = blockletInfoThrift.getColumn_data_chunks_offsets()
+        .subList(numberOfDimensionColumns,
+            blockletInfoThrift.getColumn_data_chunks_offsets().size());
+    List<Integer> dimensionColumnChunkLength =
+        blockletInfoThrift.getColumn_data_chunks_length().subList(0, numberOfDimensionColumns);
+    List<Integer> measureColumnChunksLength = blockletInfoThrift.getColumn_data_chunks_length()
+        .subList(numberOfDimensionColumns,
+            blockletInfoThrift.getColumn_data_chunks_offsets().size());
+    blockletInfo.setDimensionChunkOffsets(dimensionColumnChunkOffsets);
+    blockletInfo.setMeasureChunkOffsets(measureColumnChunksOffsets);
+    blockletInfo.setDimensionChunksLength(dimensionColumnChunkLength);
+    blockletInfo.setMeasureChunksLength(measureColumnChunksLength);
+    blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
+    blockletInfo.setDimensionOffset(blockletInfoThrift.getDimension_offsets());
+    blockletInfo.setMeasureOffsets(blockletInfoThrift.getMeasure_offsets());
+    return blockletInfo;
+  }
+
+  /**
+   * Below method will be used to get the number of dimension column
+   * in carbon column schema
+   *
+   * @param columnSchemaList column schema list
+   * @return number of dimension column
+   */
+  private int getNumberOfDimensionColumns(List<ColumnSchema> columnSchemaList) {
+    int numberOfDimensionColumns = 0;
+    int previousColumnGroupId = -1;
+    ColumnSchema columnSchema = null;
+    for (int i = 0; i < columnSchemaList.size(); i++) {
+      columnSchema = columnSchemaList.get(i);
+      if (columnSchema.isDimensionColumn() && columnSchema.isColumnar()) {
+        numberOfDimensionColumns++;
+      } else if (columnSchema.isDimensionColumn()) {
+        if (previousColumnGroupId != columnSchema.getColumnGroupId()) {
+          previousColumnGroupId = columnSchema.getColumnGroupId();
+          numberOfDimensionColumns++;
+        }
+      } else {
+        break;
+      }
+    }
+    return numberOfDimensionColumns;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
new file mode 100644
index 0000000..d46b806
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+
+public class NodeHolder {
+  /**
+   * keyArray
+   */
+  private byte[][] keyArray;
+
+  /**
+   * dataArray
+   */
+  private byte[][] dataArray;
+
+  /**
+   * measureLenght
+   */
+  private int[] measureLenght;
+
+  /**
+   * startKey
+   */
+  private byte[] startKey;
+
+  /**
+   * endKey
+   */
+  private byte[] endKey;
+
+  /**
+   * entryCount
+   */
+  private int entryCount;
+  /**
+   * keyLenghts
+   */
+  private int[] keyLengths;
+
+  /**
+   * dataAfterCompression
+   */
+  private short[][] dataAfterCompression;
+
+  /**
+   * indexMap
+   */
+  private short[][] indexMap;
+
+  /**
+   * keyIndexBlockLenght
+   */
+  private int[] keyBlockIndexLength;
+
+  /**
+   * isSortedKeyBlock
+   */
+  private boolean[] isSortedKeyBlock;
+
+  private byte[][] compressedIndex;
+
+  private byte[][] compressedIndexMap;
+
+  /**
+   * dataIndexMap
+   */
+  private int[] dataIndexMapLength;
+
+  /**
+   * dataIndexMap
+   */
+  private int[] dataIndexMapOffsets;
+
+  /**
+   * compressedDataIndex
+   */
+  private byte[][] compressedDataIndex;
+
+  /**
+   * column max data
+   */
+  private byte[][] columnMaxData;
+
+  /**
+   * column min data
+   */
+  private byte[][] columnMinData;
+
+  private byte[][] measureColumnMaxData;
+
+  private byte[][] measureColumnMinData;
+
+  /**
+   * compression model for numbers data block.
+   */
+  private WriterCompressModel compressionModel;
+
+  /**
+   * array of aggBlocks flag to identify the aggBlocks
+   */
+  private boolean[] aggBlocks;
+
+  /**
+   * all columns max value
+   */
+  private byte[][] allMaxValue;
+
+  /**
+   * all column max value
+   */
+  private byte[][] allMinValue;
+
+  /**
+   * true if given index is colgroup block
+   */
+  private boolean[] colGrpBlock;
+
+  /**
+   * bit set which will holds the measure
+   * indexes which are null
+   */
+  private BitSet[] measureNullValueIndex;
+
+  /**
+   * total length of dimension values
+   */
+  private int totalDimensionArrayLength;
+
+  /**
+   * total length of all measure values
+   */
+  private int totalMeasureArrayLength;
+
+  /**
+   * @return the keyArray
+   */
+  public byte[][] getKeyArray() {
+    return keyArray;
+  }
+
+  /**
+   * @param keyArray the keyArray to set
+   */
+  public void setKeyArray(byte[][] keyArray) {
+    this.keyArray = keyArray;
+  }
+
+  /**
+   * @return the dataArray
+   */
+  public byte[][] getDataArray() {
+    return dataArray;
+  }
+
+  /**
+   * @param dataArray the dataArray to set
+   */
+  public void setDataArray(byte[][] dataArray) {
+    this.dataArray = dataArray;
+  }
+
+  /**
+   * @return the measureLenght
+   */
+  public int[] getMeasureLenght() {
+    return measureLenght;
+  }
+
+  /**
+   * @param measureLenght the measureLenght to set
+   */
+  public void setMeasureLenght(int[] measureLenght) {
+    this.measureLenght = measureLenght;
+  }
+
+  /**
+   * @return the startKey
+   */
+  public byte[] getStartKey() {
+    return startKey;
+  }
+
+  /**
+   * @param startKey the startKey to set
+   */
+  public void setStartKey(byte[] startKey) {
+    this.startKey = startKey;
+  }
+
+  /**
+   * @return the endKey
+   */
+  public byte[] getEndKey() {
+    return endKey;
+  }
+
+  /**
+   * @param endKey the endKey to set
+   */
+  public void setEndKey(byte[] endKey) {
+    this.endKey = endKey;
+  }
+
+  /**
+   * @return the entryCount
+   */
+  public int getEntryCount() {
+    return entryCount;
+  }
+
+  /**
+   * @param entryCount the entryCount to set
+   */
+  public void setEntryCount(int entryCount) {
+    this.entryCount = entryCount;
+  }
+
+  /**
+   * @return the keyLenghts
+   */
+  public int[] getKeyLengths() {
+    return keyLengths;
+  }
+
+  public void setKeyLengths(int[] keyLengths) {
+    this.keyLengths = keyLengths;
+  }
+
+  /**
+   * @return the keyBlockIndexLength
+   */
+  public int[] getKeyBlockIndexLength() {
+    return keyBlockIndexLength;
+  }
+
+  /**
+   * @param keyBlockIndexLength the keyBlockIndexLength to set
+   */
+  public void setKeyBlockIndexLength(int[] keyBlockIndexLength) {
+    this.keyBlockIndexLength = keyBlockIndexLength;
+  }
+
+  /**
+   * @return the isSortedKeyBlock
+   */
+  public boolean[] getIsSortedKeyBlock() {
+    return isSortedKeyBlock;
+  }
+
+  /**
+   * @param isSortedKeyBlock the isSortedKeyBlock to set
+   */
+  public void setIsSortedKeyBlock(boolean[] isSortedKeyBlock) {
+    this.isSortedKeyBlock = isSortedKeyBlock;
+  }
+
+  /**
+   * @return the compressedIndexex
+   */
+  public byte[][] getCompressedIndex() {
+    return compressedIndex;
+  }
+
+  public void setCompressedIndex(byte[][] compressedIndex) {
+    this.compressedIndex = compressedIndex;
+  }
+
+  /**
+   * @return the compressedIndexMap
+   */
+  public byte[][] getCompressedIndexMap() {
+    return compressedIndexMap;
+  }
+
+  /**
+   * @param compressedIndexMap the compressedIndexMap to set
+   */
+  public void setCompressedIndexMap(byte[][] compressedIndexMap) {
+    this.compressedIndexMap = compressedIndexMap;
+  }
+
+  /**
+   * @return the compressedDataIndex
+   */
+  public byte[][] getCompressedDataIndex() {
+    return compressedDataIndex;
+  }
+
+  /**
+   * @param compressedDataIndex the compressedDataIndex to set
+   */
+  public void setCompressedDataIndex(byte[][] compressedDataIndex) {
+    this.compressedDataIndex = compressedDataIndex;
+  }
+
+  /**
+   * @return the dataIndexMapLength
+   */
+  public int[] getDataIndexMapLength() {
+    return dataIndexMapLength;
+  }
+
+  /**
+   * @param dataIndexMapLength the dataIndexMapLength to set
+   */
+  public void setDataIndexMapLength(int[] dataIndexMapLength) {
+    this.dataIndexMapLength = dataIndexMapLength;
+  }
+
+  public byte[][] getColumnMaxData() {
+    return this.columnMaxData;
+  }
+
+  public void setColumnMaxData(byte[][] columnMaxData) {
+    this.columnMaxData = columnMaxData;
+  }
+
+  public byte[][] getColumnMinData() {
+    return this.columnMinData;
+  }
+
+  public void setColumnMinData(byte[][] columnMinData) {
+    this.columnMinData = columnMinData;
+  }
+
+  public WriterCompressModel getCompressionModel() {
+    return compressionModel;
+  }
+
+  public void setCompressionModel(WriterCompressModel compressionModel) {
+    this.compressionModel = compressionModel;
+  }
+
+  /**
+   * returns array of aggBlocks flag to identify the aag blocks
+   *
+   * @return
+   */
+  public boolean[] getAggBlocks() {
+    return aggBlocks;
+  }
+
+  /**
+   * set array of aggBlocks flag to identify the aggBlocks
+   *
+   * @param aggBlocks
+   */
+  public void setAggBlocks(boolean[] aggBlocks) {
+    this.aggBlocks = aggBlocks;
+  }
+
+  /**
+   * @return
+   */
+  public boolean[] getColGrpBlocks() {
+    return this.colGrpBlock;
+  }
+
+  /**
+   * @param colGrpBlock true if block is column group
+   */
+  public void setColGrpBlocks(boolean[] colGrpBlock) {
+    this.colGrpBlock = colGrpBlock;
+  }
+
+  /**
+   * @return the measureNullValueIndex
+   */
+  public BitSet[] getMeasureNullValueIndex() {
+    return measureNullValueIndex;
+  }
+
+  /**
+   * @param measureNullValueIndex the measureNullValueIndex to set
+   */
+  public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
+    this.measureNullValueIndex = measureNullValueIndex;
+  }
+
+  public int getTotalDimensionArrayLength() {
+    return totalDimensionArrayLength;
+  }
+
+  public void setTotalDimensionArrayLength(int totalDimensionArrayLength) {
+    this.totalDimensionArrayLength = totalDimensionArrayLength;
+  }
+
+  public int getTotalMeasureArrayLength() {
+    return totalMeasureArrayLength;
+  }
+
+  public void setTotalMeasureArrayLength(int totalMeasureArrayLength) {
+    this.totalMeasureArrayLength = totalMeasureArrayLength;
+  }
+
+  public byte[][] getMeasureColumnMaxData() {
+    return measureColumnMaxData;
+  }
+
+  public void setMeasureColumnMaxData(byte[][] measureColumnMaxData) {
+    this.measureColumnMaxData = measureColumnMaxData;
+  }
+
+  public byte[][] getMeasureColumnMinData() {
+    return measureColumnMinData;
+  }
+
+  public void setMeasureColumnMinData(byte[][] measureColumnMinData) {
+    this.measureColumnMinData = measureColumnMinData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 3935bdc..2c6c890 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -155,7 +155,7 @@ public class CarbonMetadataUtilTest {
     segmentInfo.setNum_cols(0);
     segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
     IndexHeader indexHeader = new IndexHeader();
-    indexHeader.setVersion(2);
+    indexHeader.setVersion(3);
     indexHeader.setSegment_info(segmentInfo);
     indexHeader.setTable_columns(columnSchemaList);
     indexHeader.setBucket_id(0);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index 759fbf7..3114ee1 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -127,10 +127,21 @@ struct DataChunk2{
     7: optional SortState sort_state;
     8: optional list<schema.Encoding> encoders; // The List of encoders overriden at node level
     9: optional list<binary> encoder_meta; // extra information required by encoders
-}
+    10: optional BlockletMinMaxIndex min_max; 
+    11: optional i32 numberOfRowsInpage;
+ }
 
 
 /**
+* Represents a chunk of data. The chunk can be a single column stored in Column Major format or a group of columns stored in Row Major Format.
+**/
+struct DataChunk3{
+    1: required list<DataChunk2> data_chunk_list; // list of data chunk
+    2: optional list<i32> page_offset; // offset of each chunk
+    3: optional list<i32> page_length; // length of each chunk
+   
+ }
+/**
 *	Information about a blocklet
 */
 struct BlockletInfo{
@@ -146,7 +157,16 @@ struct BlockletInfo2{
     2: required list<i64> column_data_chunks_offsets;	// Information about offsets all column chunks in this blocklet
     3: required list<i16> column_data_chunks_length;	// Information about length all column chunks in this blocklet
 }
-
+/**
+*	Information about a blocklet
+*/
+struct BlockletInfo3{
+    1: required i32 num_rows;	// Number of rows in this blocklet
+    2: required list<i64> column_data_chunks_offsets;	// Information about offsets all column chunks in this blocklet
+    3: required list<i32> column_data_chunks_length;	// Information about length all column chunks in this blocklet
+    4: required i64 dimension_offsets;
+    5: required i64 measure_offsets;
+  }
 /**
 * Footer for indexed carbon file
 */
@@ -158,7 +178,8 @@ struct FileFooter{
     5: required list<BlockletIndex> blocklet_index_list;	// blocklet index of all blocklets in this file
     6: optional list<BlockletInfo> blocklet_info_list;	// Information about blocklets of all columns in this file
     7: optional list<BlockletInfo2> blocklet_info_list2;	// Information about blocklets of all columns in this file
-    8: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary
+    8: optional list<BlockletInfo3> blocklet_info_list3;	// Information about blocklets of all columns in this file
+    9: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
index 828ece8..3f75cd1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
 import org.apache.carbondata.processing.store.writer.v2.CarbonFactDataWriterImplV2;
+import org.apache.carbondata.processing.store.writer.v3.CarbonFactDataWriterImplV3;
 
 /**
  * Factory class to get the writer instance
@@ -62,8 +63,12 @@ public class CarbonDataWriterFactory {
     switch (version) {
       case V1:
         return new CarbonFactDataWriterImplV1(carbonDataWriterVo);
-      default:
+      case V2:
         return new CarbonFactDataWriterImplV2(carbonDataWriterVo);
+      case V3:
+        return new CarbonFactDataWriterImplV3(carbonDataWriterVo);
+      default:
+        return new CarbonFactDataWriterImplV3(carbonDataWriterVo);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index bf66700..0699167 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -40,9 +40,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
@@ -59,6 +61,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.mdkeygen.file.FileManager;
@@ -70,7 +73,6 @@ import org.apache.carbondata.processing.store.colgroup.ColumnDataHolder;
 import org.apache.carbondata.processing.store.colgroup.DataHolder;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.NodeHolder;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
 
@@ -257,6 +259,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private int bucketNumber;
 
   /**
+   * current data format version
+   */
+  private ColumnarFormatVersion version;
+
+  /**
    * CarbonFactDataHandler constructor
    */
   public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
@@ -326,6 +333,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       }
       aggKeyBlock = arrangeUniqueBlockType(aggKeyBlock);
     }
+    version = CarbonProperties.getInstance().getFormatVersion();
   }
 
   private void initParameters(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
@@ -476,7 +484,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
         max[i] = -Double.MAX_VALUE;
       } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        max[i] = new BigDecimal(0.0);
+        max[i] = new BigDecimal(-Double.MAX_VALUE);
       } else {
         max[i] = 0.0;
       }
@@ -748,9 +756,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   // TODO remove after kettle flow is removed
   private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal, byte[][] byteArrayValues,
       int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
-      WriterCompressModel compressionModel, byte[][] noDictionaryData,
-      byte[] noDictionaryStartKey, byte[] noDictionaryEndKey)
-      throws CarbonDataWriterException {
+      WriterCompressModel compressionModel, byte[][] noDictionaryData, byte[] noDictionaryStartKey,
+      byte[] noDictionaryEndKey) throws CarbonDataWriterException {
     byte[][][] noDictionaryColumnsData = null;
     List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
     int complexColCount = getComplexColsCount();
@@ -836,9 +843,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       if (dimensionType[i]) {
         dictionaryColumnCount++;
         if (colGrpModel.isColumnar(dictionaryColumnCount)) {
-          submit.add(executorService
-              .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(),
-                  true, isUseInvertedIndex[i])));
+          submit.add(executorService.submit(
+              new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
+                  isUseInvertedIndex[i])));
         } else {
           submit.add(
               executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
@@ -876,8 +883,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private NodeHolder getNodeHolderObjectWithOutKettle(byte[][] dataHolderLocal,
       byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
       WriterCompressModel compressionModel, byte[][][] noDictionaryData,
-      byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey)
-      throws CarbonDataWriterException {
+      byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey) throws CarbonDataWriterException {
     byte[][][] noDictionaryColumnsData = null;
     List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
     int complexColCount = getComplexColsCount();
@@ -907,7 +913,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
             int keyLength = splitKey[j].length;
             byte[] newKey = new byte[keyLength + 2];
             ByteBuffer buffer = ByteBuffer.wrap(newKey);
-            buffer.putShort((short)keyLength);
+            buffer.putShort((short) keyLength);
             System.arraycopy(splitKey[j], 0, newKey, 2, keyLength);
             noDictionaryColumnsData[j][i] = newKey;
           }
@@ -963,9 +969,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       if (dimensionType[i]) {
         dictionaryColumnCount++;
         if (colGrpModel.isColumnar(dictionaryColumnCount)) {
-          submit.add(executorService
-              .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(),
-                  true, isUseInvertedIndex[i])));
+          submit.add(executorService.submit(
+              new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
+                  isUseInvertedIndex[i])));
         } else {
           submit.add(
               executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
@@ -1008,7 +1014,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
             endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey);
   }
 
-
   /**
    * DataHolder will have all row mdkey data
    *
@@ -1150,6 +1155,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
     return decimalPlaces;
   }
+
   /**
    * This method will be used to update the max value for each measure
    */
@@ -1220,7 +1226,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
             CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
-    LOGGER.info("Blocklet Size: " + blockletSize);
+    if (version == ColumnarFormatVersion.V3) {
+      this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+              CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
+    }
+    LOGGER.info("Number of rows per column blocklet " + blockletSize);
     dataRows = new ArrayList<>(this.blockletSize);
     int dimSet =
         Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
@@ -1280,8 +1291,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         .getBlockKeySize());
     System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
         blockKeySize.length - noOfColStore);
-    this.dataWriter =
-        getFactDataWriter(keyBlockSize);
+    this.dataWriter = getFactDataWriter(keyBlockSize);
     this.dataWriter.setIsNoDictionary(isNoDictionary);
     // initialize the channel;
     this.dataWriter.initializeWriter();
@@ -1377,7 +1387,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * @return data writer instance
    */
   private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
-    ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
     return CarbonDataWriterFactory.getInstance()
         .getFactDataWriter(version, getDataWriterVo(keyBlockSize));
   }
@@ -1620,8 +1629,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
     @Override public IndexStorage call() throws Exception {
       if (isUseInvertedIndex) {
-        return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary,
-            isSortRequired);
+        if (version == ColumnarFormatVersion.V3) {
+          return new BlockIndexerStorageForShort(this.data, isCompressionReq, isNoDictionary,
+              isSortRequired);
+        } else {
+          return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary,
+              isSortRequired);
+        }
       } else {
         return new BlockIndexerStorageForNoInvertedIndex(this.data, isCompressionReq,
             isNoDictionary);


[3/4] incubator-carbondata git commit: Added V3 Format Writer and Reader Code

Posted by ja...@apache.org.
Added V3 Format Writer and Reader Code

Added code to support V3 Writer + Reader


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2cf1104d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2cf1104d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2cf1104d

Branch: refs/heads/master
Commit: 2cf1104db43a5591fe2bcabb97ba02202428132a
Parents: 922683e
Author: kumarvishal <ku...@gmail.com>
Authored: Thu Feb 23 16:44:41 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sun Feb 26 22:42:32 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  38 +-
 .../constants/CarbonV3DataFormatConstants.java  |  84 ++++
 .../datastore/chunk/AbstractRawColumnChunk.java |  11 +
 .../chunk/reader/CarbonDataReaderFactory.java   |  14 +-
 .../AbstractChunkReaderV2V3Format.java          | 126 +++++
 ...mpressedDimensionChunkFileBasedReaderV2.java | 127 ++---
 ...mpressedDimensionChunkFileBasedReaderV3.java | 268 ++++++++++
 .../AbstractMeasureChunkReaderV2V3Format.java   | 124 +++++
 ...CompressedMeasureChunkFileBasedReaderV2.java | 106 +---
 ...CompressedMeasureChunkFileBasedReaderV3.java | 239 +++++++++
 .../SafeFixedLengthDimensionDataChunkStore.java |   9 +-
 .../columnar/BlockIndexerStorageForShort.java   | 228 +++++++++
 .../columnar/ColumnWithShortIndex.java          |  76 +++
 .../ColumnWithShortIndexForNoDictionay.java     |  46 ++
 .../core/metadata/ColumnarFormatVersion.java    |   9 +-
 .../executor/impl/AbstractQueryExecutor.java    |  10 +-
 .../IncludeColGroupFilterExecuterImpl.java      |  24 +
 ...velRangeLessThanEqualFilterExecuterImpl.java |  14 +-
 .../RowLevelRangeLessThanFiterExecuterImpl.java |  14 +-
 .../carbondata/core/util/BitSetGroup.java       |   6 +-
 .../core/util/CarbonMetadataUtil.java           | 347 ++++++++++++-
 .../carbondata/core/util/CarbonProperties.java  | 110 +++-
 .../apache/carbondata/core/util/CarbonUtil.java | 110 ++++
 .../util/DataFileFooterConverterFactory.java    |   5 +-
 .../core/util/DataFileFooterConverterV3.java    | 141 ++++++
 .../apache/carbondata/core/util/NodeHolder.java | 430 ++++++++++++++++
 .../core/util/CarbonMetadataUtilTest.java       |   2 +-
 format/src/main/thrift/carbondata.thrift        |  27 +-
 .../store/CarbonDataWriterFactory.java          |   7 +-
 .../store/CarbonFactDataHandlerColumnar.java    |  56 ++-
 .../store/writer/AbstractFactDataWriter.java    |  54 +-
 .../store/writer/CarbonFactDataWriter.java      |   1 +
 .../processing/store/writer/NodeHolder.java     | 410 ---------------
 .../writer/v1/CarbonFactDataWriterImplV1.java   |   9 +-
 .../writer/v2/CarbonFactDataWriterImplV2.java   |   8 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   | 499 +++++++++++++++++++
 .../store/writer/v3/DataWriterHolder.java       |  68 +++
 37 files changed, 3114 insertions(+), 743 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 1142c4e..146b78e 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -83,7 +83,7 @@ public final class CarbonCommonConstants {
   /**
    * min blocklet size
    */
-  public static final int BLOCKLET_SIZE_MIN_VAL = 50;
+  public static final int BLOCKLET_SIZE_MIN_VAL = 2000;
   /**
    * max blocklet size
    */
@@ -791,7 +791,7 @@ public final class CarbonCommonConstants {
   public static final String CARBON_MERGE_SORT_PREFETCH_DEFAULT = "true";
 
   /**
-   *  default name of data base
+   * default name of data base
    */
   public static final String DATABASE_DEFAULT_NAME = "default";
 
@@ -808,8 +808,7 @@ public final class CarbonCommonConstants {
   /**
    * this variable is to enable/disable identify high cardinality during first data loading
    */
-  public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE =
-      "high.cardinality.identify.enable";
+  public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE = "high.cardinality.identify.enable";
   public static final String HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT = "true";
 
   /**
@@ -843,26 +842,23 @@ public final class CarbonCommonConstants {
   /**
    * ZOOKEEPERLOCK TYPE
    */
-  public static final String CARBON_LOCK_TYPE_ZOOKEEPER =
-      "ZOOKEEPERLOCK";
+  public static final String CARBON_LOCK_TYPE_ZOOKEEPER = "ZOOKEEPERLOCK";
 
   /**
    * LOCALLOCK TYPE
    */
-  public static final String CARBON_LOCK_TYPE_LOCAL =
-      "LOCALLOCK";
+  public static final String CARBON_LOCK_TYPE_LOCAL = "LOCALLOCK";
 
   /**
    * HDFSLOCK TYPE
    */
-  public static final String CARBON_LOCK_TYPE_HDFS =
-      "HDFSLOCK";
+  public static final String CARBON_LOCK_TYPE_HDFS = "HDFSLOCK";
 
   /**
    * Invalid filter member log string
    */
-  public static final String FILTER_INVALID_MEMBER = " Invalid Record(s) are present "
-                                                     + "while filter evaluation. ";
+  public static final String FILTER_INVALID_MEMBER =
+      " Invalid Record(s) are present while filter evaluation. ";
 
   /**
    * Number of unmerged segments to be merged.
@@ -880,25 +876,23 @@ public final class CarbonCommonConstants {
    * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
    */
   public static final String UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
-          "carbon.horizontal.update.compaction.threshold";
+      "carbon.horizontal.update.compaction.threshold";
   /**
    * Default count of segments which act as a threshold for IUD compaction merge.
    */
   public static final String DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
 
-
   /**
    * Number of Delete Delta files which is the Threshold for IUD compaction.
    * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
    */
-  public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION  =
+  public static final String DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
       "carbon.horizontal.delete.compaction.threshold";
   /**
    * Default count of segments which act as a threshold for IUD compaction merge.
    */
   public static final String DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
 
-
   /**
    * default location of the carbon metastore db
    */
@@ -943,8 +937,7 @@ public final class CarbonCommonConstants {
    * @Deprecated : This property has been deprecated.
    * Property for enabling system level compaction lock.1 compaction can run at once.
    */
-  public static String ENABLE_CONCURRENT_COMPACTION =
-      "carbon.concurrent.compaction";
+  public static String ENABLE_CONCURRENT_COMPACTION = "carbon.concurrent.compaction";
 
   /**
    * Default value of Property for enabling system level compaction lock.1 compaction can run
@@ -1024,12 +1017,8 @@ public final class CarbonCommonConstants {
   /**
    * current data file version
    */
-  public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V2";
-  /**
-   * number of column data will read in IO operation
-   * during query execution
-   */
-  public static final short NUMBER_OF_COLUMN_READ_IN_IO = 10;
+  public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V3";
+
   /**
    * data file version header
    */
@@ -1105,7 +1094,6 @@ public final class CarbonCommonConstants {
 
   /**
    * Default carbon dictionary server port
-
    */
   public static final String DICTIONARY_SERVER_PORT_DEFAULT = "2030";
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
new file mode 100644
index 0000000..060b55c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonV3DataFormatConstants.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.constants;
+
+/**
+ * Constants for V3 data format
+ */
+public interface CarbonV3DataFormatConstants {
+
+  /**
+   * number of page per blocklet column
+   */
+  String NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN = "carbon.number.of.page.in.blocklet.column";
+
+  /**
+   * number of page per blocklet column default value
+   */
+  String NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE = "10";
+
+  /**
+   * number of page per blocklet column max value
+   */
+  short NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MAX = 20;
+
+  /**
+   * number of page per blocklet column min value
+   */
+  short NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MIN = 1;
+
+  /**
+   * number of column to be read in one IO in query
+   */
+  String NUMBER_OF_COLUMN_TO_READ_IN_IO = "number.of.column.to.read.in.io";
+
+  /**
+   * number of column to be read in one IO in query default value
+   */
+  String NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE = "10";
+
+  /**
+   * number of column to be read in one IO in query max value
+   */
+  short NUMBER_OF_COLUMN_TO_READ_IN_IO_MAX = 20;
+
+  /**
+   * number of column to be read in one IO in query min value
+   */
+  short NUMBER_OF_COLUMN_TO_READ_IN_IO_MIN = 1;
+
+  /**
+   * number of rows per blocklet column page
+   */
+  String NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE = "number.of.rows.per.blocklet.column.page";
+
+  /**
+   * number of rows per blocklet column page default value
+   */
+  String NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT = "32000";
+
+  /**
+   * number of rows per blocklet column page max value
+   */
+  short NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MAX = 32000;
+
+  /**
+   * number of rows per blocklet column page min value
+   */
+  short NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MIN = 8000;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
index d04077c..eebb382 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/AbstractRawColumnChunk.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datastore.chunk;
 
 import java.nio.ByteBuffer;
 
+import org.apache.carbondata.format.DataChunk3;
 
 /**
  * It contains group of uncompressed blocklets on one column.
@@ -44,6 +45,8 @@ public abstract class AbstractRawColumnChunk {
 
   protected int length;
 
+  protected DataChunk3 dataChunkV3;
+
   public AbstractRawColumnChunk(int blockletId, ByteBuffer rawData, int offSet, int length) {
     this.blockletId = blockletId;
     this.rawData = rawData;
@@ -121,4 +124,12 @@ public abstract class AbstractRawColumnChunk {
     return length;
   }
 
+  public DataChunk3 getDataChunkV3() {
+    return dataChunkV3;
+  }
+
+  public void setDataChunkV3(DataChunk3 dataChunkV3) {
+    this.dataChunkV3 = dataChunkV3;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
index e20fcbe..8fee760 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/CarbonDataReaderFactory.java
@@ -18,8 +18,10 @@ package org.apache.carbondata.core.datastore.chunk.reader;
 
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.v1.CompressedDimensionChunkFileBasedReaderV1;
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.v2.CompressedDimensionChunkFileBasedReaderV2;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.v3.CompressedDimensionChunkFileBasedReaderV3;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.v1.CompressedMeasureChunkFileBasedReaderV1;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.v2.CompressedMeasureChunkFileBasedReaderV2;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.v3.CompressedMeasureChunkFileBasedReaderV3;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 
@@ -65,9 +67,13 @@ public class CarbonDataReaderFactory {
       case V1:
         return new CompressedDimensionChunkFileBasedReaderV1(blockletInfo, eachColumnValueSize,
             filePath);
-      default:
+      case V2:
         return new CompressedDimensionChunkFileBasedReaderV2(blockletInfo, eachColumnValueSize,
             filePath);
+      case V3:
+      default:
+        return new CompressedDimensionChunkFileBasedReaderV3(blockletInfo, eachColumnValueSize,
+            filePath);
     }
   }
 
@@ -84,8 +90,12 @@ public class CarbonDataReaderFactory {
     switch (version) {
       case V1:
         return new CompressedMeasureChunkFileBasedReaderV1(blockletInfo, filePath);
-      default:
+      case V2:
         return new CompressedMeasureChunkFileBasedReaderV2(blockletInfo, filePath);
+      case V3:
+      default:
+        return new CompressedMeasureChunkFileBasedReaderV3(blockletInfo, filePath);
+
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
new file mode 100644
index 0000000..f083612
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/AbstractChunkReaderV2V3Format.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.format.Encoding;
+
+/**
+ * Abstract class for V2, V3 format dimension column reader
+ */
+public abstract class AbstractChunkReaderV2V3Format extends AbstractChunkReader {
+
+  /**
+   * dimension chunks offset
+   */
+  protected List<Long> dimensionChunksOffset;
+
+  /**
+   * dimension chunks length
+   */
+  protected List<Integer> dimensionChunksLength;
+
+  public AbstractChunkReaderV2V3Format(final BlockletInfo blockletInfo,
+      final int[] eachColumnValueSize, final String filePath) {
+    super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
+    dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
+    dimensionChunksLength = blockletInfo.getDimensionChunksLength();
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   * Reading logic of below method is:
+   * Except last column all the column chunk can be read in group
+   * if not last column then read data of all the column present in block index
+   * together then process it.
+   * For last column read is separately and process
+   *
+   * @param fileReader      file reader to read the blocks from file
+   * @param blockletIndexes blocks range to be read
+   * @return dimension column chunks
+   */
+  @Override public DimensionRawColumnChunk[] readRawDimensionChunks(final FileHolder fileReader,
+      final int[][] blockletIndexes) throws IOException {
+    // read the column chunk based on block index and add
+    DimensionRawColumnChunk[] dataChunks =
+        new DimensionRawColumnChunk[dimensionChunksOffset.size()];
+    // if blocklet index is empty then return empry data chunk
+    if (blockletIndexes.length == 0) {
+      return dataChunks;
+    }
+    DimensionRawColumnChunk[] groupChunk = null;
+    int index = 0;
+    // iterate till block indexes -1 as block index will be in sorted order, so to avoid
+    // the last column reading in group
+    for (int i = 0; i < blockletIndexes.length - 1; i++) {
+      index = 0;
+      groupChunk =
+          readRawDimensionChunksInGroup(fileReader, blockletIndexes[i][0], blockletIndexes[i][1]);
+      for (int j = blockletIndexes[i][0]; j <= blockletIndexes[i][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    // check last index is present in block index, if it is present then read separately
+    if (blockletIndexes[blockletIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
+      dataChunks[blockletIndexes[blockletIndexes.length - 1][0]] =
+          readRawDimensionChunk(fileReader, blockletIndexes[blockletIndexes.length - 1][0]);
+    }
+    // otherwise read the data in group
+    else {
+      groupChunk =
+          readRawDimensionChunksInGroup(fileReader, blockletIndexes[blockletIndexes.length - 1][0],
+              blockletIndexes[blockletIndexes.length - 1][1]);
+      index = 0;
+      for (int j = blockletIndexes[blockletIndexes.length - 1][0];
+           j <= blockletIndexes[blockletIndexes.length - 1][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Below method will be used to read measure chunk data in group.
+   * This method will be useful to avoid multiple IO while reading the
+   * data from
+   *
+   * @param fileReader               file reader to read the data
+   * @param startColumnBlockletIndex first column blocklet index to be read
+   * @param endColumnBlockletIndex   end column blocklet index to be read
+   * @return measure raw chunkArray
+   * @throws IOException
+   */
+  protected abstract DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+      int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException;
+
+  /**
+   * Below method will be used to check whether particular encoding is present
+   * in the dimension or not
+   *
+   * @param encoding encoding to search
+   * @return if encoding is present in dimension
+   */
+  protected boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
+    return encodings.contains(encoding);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
index 9d5849f..b2201cd 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -18,7 +18,6 @@ package org.apache.carbondata.core.datastore.chunk.reader.dimension.v2;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
@@ -26,7 +25,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataC
 import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
-import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -36,17 +35,7 @@ import org.apache.carbondata.format.Encoding;
 /**
  * Compressed dimension chunk reader class for version 2
  */
-public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkReader {
-
-  /**
-   * dimension chunks offset
-   */
-  private List<Long> dimensionChunksOffset;
-
-  /**
-   * dimension chunks length
-   */
-  private List<Integer> dimensionChunksLength;
+public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkReaderV2V3Format {
 
   /**
    * Constructor to get minimum parameter to create instance of this class
@@ -57,73 +46,18 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
    */
   public CompressedDimensionChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
       final int[] eachColumnValueSize, final String filePath) {
-    super(eachColumnValueSize, filePath, blockletInfo.getNumberOfRows());
-    this.dimensionChunksOffset = blockletInfo.getDimensionChunkOffsets();
-    this.dimensionChunksLength = blockletInfo.getDimensionChunksLength();
-
-  }
-
-  /**
-   * Below method will be used to read the chunk based on block indexes
-   * Reading logic of below method is:
-   * Except last column all the column chunk can be read in group
-   * if not last column then read data of all the column present in block index
-   * together then process it.
-   * For last column read is separately and process
-   *
-   * @param fileReader   file reader to read the blocks from file
-   * @param blockletIndexes blocks range to be read
-   * @return dimension column chunks
-   */
-  @Override public DimensionRawColumnChunk[] readRawDimensionChunks(final FileHolder fileReader,
-      final int[][] blockletIndexes) throws IOException {
-    // read the column chunk based on block index and add
-    DimensionRawColumnChunk[] dataChunks =
-        new DimensionRawColumnChunk[dimensionChunksOffset.size()];
-    // if blocklet index is empty then return empry data chunk
-    if (blockletIndexes.length == 0) {
-      return dataChunks;
-    }
-    DimensionRawColumnChunk[] groupChunk = null;
-    int index = 0;
-    // iterate till block indexes -1 as block index will be in sorted order, so to avoid
-    // the last column reading in group
-    for (int i = 0; i < blockletIndexes.length - 1; i++) {
-      index = 0;
-      groupChunk =
-          readRawDimensionChunksInGroup(fileReader, blockletIndexes[i][0], blockletIndexes[i][1]);
-      for (int j = blockletIndexes[i][0]; j <= blockletIndexes[i][1]; j++) {
-        dataChunks[j] = groupChunk[index++];
-      }
-    }
-    // check last index is present in block index, if it is present then read separately
-    if (blockletIndexes[blockletIndexes.length - 1][0] == dimensionChunksOffset.size() - 1) {
-      dataChunks[blockletIndexes[blockletIndexes.length - 1][0]] =
-          readRawDimensionChunk(fileReader, blockletIndexes[blockletIndexes.length - 1][0]);
-    }
-    // otherwise read the data in group
-    else {
-      groupChunk =
-          readRawDimensionChunksInGroup(fileReader, blockletIndexes[blockletIndexes.length - 1][0],
-              blockletIndexes[blockletIndexes.length - 1][1]);
-      index = 0;
-      for (int j = blockletIndexes[blockletIndexes.length - 1][0];
-           j <= blockletIndexes[blockletIndexes.length - 1][1]; j++) {
-        dataChunks[j] = groupChunk[index++];
-      }
-    }
-    return dataChunks;
+    super(blockletInfo, eachColumnValueSize, filePath);
   }
 
   /**
    * Below method will be used to read the chunk based on block index
    *
-   * @param fileReader file reader to read the blocks from file
+   * @param fileReader    file reader to read the blocks from file
    * @param blockletIndex block to be read
    * @return dimension column chunk
    */
-  public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
-      int blockletIndex) throws IOException {
+  public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader, int blockletIndex)
+      throws IOException {
     int length = 0;
     if (dimensionChunksOffset.size() - 1 == blockletIndex) {
       // Incase of last block read only for datachunk and read remaining while converting it.
@@ -140,24 +74,35 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
         new DimensionRawColumnChunk(blockletIndex, buffer, 0, length, this);
     rawColumnChunk.setFileHolder(fileReader);
     rawColumnChunk.setPagesCount(1);
-    rawColumnChunk.setRowCount(new int[]{numberOfRows});
+    rawColumnChunk.setRowCount(new int[] { numberOfRows });
     return rawColumnChunk;
   }
 
-  private DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
-      int startBlockIndex, int endBlockIndex) throws IOException {
-    long currentDimensionOffset = dimensionChunksOffset.get(startBlockIndex);
+  /**
+   * Below method will be used to read measure chunk data in group.
+   * This method will be useful to avoid multiple IO while reading the
+   * data from
+   *
+   * @param fileReader               file reader to read the data
+   * @param startColumnBlockletIndex first column blocklet index to be read
+   * @param endColumnBlockletIndex   end column blocklet index to be read
+   * @return measure raw chunkArray
+   * @throws IOException
+   */
+  protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+      int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+    long currentDimensionOffset = dimensionChunksOffset.get(startColumnBlockletIndex);
     ByteBuffer buffer = ByteBuffer.allocateDirect(
-        (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+        (int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset));
     synchronized (fileReader) {
       fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
-          (int) (dimensionChunksOffset.get(endBlockIndex + 1) - currentDimensionOffset));
+          (int) (dimensionChunksOffset.get(endColumnBlockletIndex + 1) - currentDimensionOffset));
     }
     DimensionRawColumnChunk[] dataChunks =
-        new DimensionRawColumnChunk[endBlockIndex - startBlockIndex + 1];
+        new DimensionRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
     int index = 0;
     int runningLength = 0;
-    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+    for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
       int currentLength = (int) (dimensionChunksOffset.get(i + 1) - dimensionChunksOffset.get(i));
       dataChunks[index] =
           new DimensionRawColumnChunk(i, buffer, runningLength, currentLength, this);
@@ -181,8 +126,8 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
     int blockIndex = dimensionRawColumnChunk.getBlockletId();
     ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
     if (dimensionChunksOffset.size() - 1 == blockIndex) {
-      dimensionColumnChunk = CarbonUtil
-          .readDataChunk(rawData, copySourcePoint, dimensionRawColumnChunk.getLength());
+      dimensionColumnChunk =
+          CarbonUtil.readDataChunk(rawData, copySourcePoint, dimensionRawColumnChunk.getLength());
       int totalDimensionDataLength =
           dimensionColumnChunk.data_page_length + dimensionColumnChunk.rle_page_length
               + dimensionColumnChunk.rowid_page_length;
@@ -202,8 +147,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
     rawData.position(copySourcePoint);
     rawData.get(data);
     // first read the data and uncompressed it
-    dataPage =
-        COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
+    dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
     copySourcePoint += dimensionColumnChunk.data_page_length;
     // if row id block is present then read the row id chunk and uncompress it
     if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
@@ -223,8 +167,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
       byte[] dataRle = new byte[dimensionColumnChunk.rle_page_length];
       rawData.position(copySourcePoint);
       rawData.get(dataRle);
-      rlePage =
-          numberComressor.unCompress(dataRle, 0, dimensionColumnChunk.rle_page_length);
+      rlePage = numberComressor.unCompress(dataRle, 0, dimensionColumnChunk.rle_page_length);
       // uncompress the data with rle indexes
       dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage, eachColumnValueSize[blockIndex]);
     }
@@ -250,16 +193,4 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
     }
     return columnDataChunk;
   }
-
-  /**
-   * Below method will be used to check whether particular encoding is present
-   * in the dimension or not
-   *
-   * @param encoding encoding to search
-   * @return if encoding is present in dimension
-   */
-  private boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
-    return encodings.contains(encoding);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
new file mode 100644
index 0000000..acaa2fa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.dimension.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.ColumnGroupDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionDataChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
+import org.apache.carbondata.format.Encoding;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * Dimension column V3 Reader class which will be used to read and uncompress
+ * V3 format data
+ * data format
+ * Data Format
+ * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+ * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+ * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+ * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+ */
+public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkReaderV2V3Format {
+
+  /**
+   * end position of last dimension in carbon data file
+   */
+  private long lastDimensionOffsets;
+
+  public CompressedDimensionChunkFileBasedReaderV3(BlockletInfo blockletInfo,
+      int[] eachColumnValueSize, String filePath) {
+    super(blockletInfo, eachColumnValueSize, filePath);
+    lastDimensionOffsets = blockletInfo.getDimensionOffset();
+  }
+
+  /**
+   * Below method will be used to read the dimension column data form carbon data file
+   * Steps for reading
+   * 1. Get the length of the data to be read
+   * 2. Allocate the direct buffer
+   * 3. read the data from file
+   * 4. Get the data chunk object from data read
+   * 5. Create the raw chunk object and fill the details
+   *
+   * @param fileReader          reader for reading the column from carbon data file
+   * @param blockletColumnIndex blocklet index of the column in carbon data file
+   * @return dimension raw chunk
+   */
+  public DimensionRawColumnChunk readRawDimensionChunk(FileHolder fileReader,
+      int blockletColumnIndex) throws IOException {
+    // get the current dimension offset
+    long currentDimensionOffset = dimensionChunksOffset.get(blockletColumnIndex);
+    int length = 0;
+    // to calculate the length of the data to be read
+    // column other than last column we can subtract the offset of current column with
+    // next column and get the total length.
+    // but for last column we need to use lastDimensionOffset which is the end position
+    // of the last dimension, we can subtract current dimension offset from lastDimesionOffset
+    if (dimensionChunksOffset.size() - 1 == blockletColumnIndex) {
+      length = (int) (lastDimensionOffsets - currentDimensionOffset);
+    } else {
+      length = (int) (dimensionChunksOffset.get(blockletColumnIndex + 1) - currentDimensionOffset);
+    }
+    // allocate the buffer
+    ByteBuffer buffer = ByteBuffer.allocateDirect(length);
+    // read the data from carbon data file
+    synchronized (fileReader) {
+      fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset, length);
+    }
+    // get the data chunk which will have all the details about the data pages
+    DataChunk3 dataChunk = CarbonUtil.readDataChunk3(buffer, 0, length);
+    // creating a raw chunks instance and filling all the details
+    DimensionRawColumnChunk rawColumnChunk =
+        new DimensionRawColumnChunk(blockletColumnIndex, buffer, 0, length, this);
+    int numberOfPages = dataChunk.getPage_length().size();
+    byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+    byte[][] minValueOfEachPage = new byte[numberOfPages][];
+    int[] eachPageLength = new int[numberOfPages];
+    for (int i = 0; i < minValueOfEachPage.length; i++) {
+      maxValueOfEachPage[i] =
+          dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
+      minValueOfEachPage[i] =
+          dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
+      eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
+    }
+    rawColumnChunk.setDataChunkV3(dataChunk);
+    rawColumnChunk.setFileHolder(fileReader);
+    rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+    rawColumnChunk.setMaxValues(maxValueOfEachPage);
+    rawColumnChunk.setMinValues(minValueOfEachPage);
+    rawColumnChunk.setRowCount(eachPageLength);
+    rawColumnChunk.setLengths(ArrayUtils
+        .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+    rawColumnChunk.setOffsets(ArrayUtils
+        .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+    return rawColumnChunk;
+  }
+
+  /**
+   * Below method will be used to read the multiple dimension column data in group
+   * and divide into dimension raw chunk object
+   * Steps for reading
+   * 1. Get the length of the data to be read
+   * 2. Allocate the direct buffer
+   * 3. read the data from file
+   * 4. Get the data chunk object from file for each column
+   * 5. Create the raw chunk object and fill the details for each column
+   * 6. increment the offset of the data
+   *
+   * @param fileReader
+   *        reader which will be used to read the dimension columns data from file
+   * @param startBlockletColumnIndex
+   *        blocklet index of the first dimension column
+   * @param endBlockletColumnIndex
+   *        blocklet index of the last dimension column
+   * @ DimensionRawColumnChunk array
+   */
+  protected DimensionRawColumnChunk[] readRawDimensionChunksInGroup(FileHolder fileReader,
+      int startBlockletColumnIndex, int endBlockletColumnIndex) throws IOException {
+    // to calculate the length of the data to be read
+    // column we can subtract the offset of start column offset with
+    // end column+1 offset and get the total length.
+    long currentDimensionOffset = dimensionChunksOffset.get(startBlockletColumnIndex);
+    ByteBuffer buffer = ByteBuffer.allocateDirect(
+        (int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
+    // read the data from carbon data file
+    synchronized (fileReader) {
+      fileReader.readByteBuffer(filePath, buffer, currentDimensionOffset,
+          (int) (dimensionChunksOffset.get(endBlockletColumnIndex + 1) - currentDimensionOffset));
+    }
+    // create raw chunk for each dimension column
+    DimensionRawColumnChunk[] dimensionDataChunks =
+        new DimensionRawColumnChunk[endBlockletColumnIndex - startBlockletColumnIndex + 1];
+    int index = 0;
+    int runningLength = 0;
+    for (int i = startBlockletColumnIndex; i <= endBlockletColumnIndex; i++) {
+      int currentLength = (int) (dimensionChunksOffset.get(i + 1) - dimensionChunksOffset.get(i));
+      dimensionDataChunks[index] =
+          new DimensionRawColumnChunk(i, buffer, runningLength, currentLength, this);
+      DataChunk3 dataChunk =
+          CarbonUtil.readDataChunk3(buffer, runningLength, dimensionChunksLength.get(i));
+      int numberOfPages = dataChunk.getPage_length().size();
+      byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+      byte[][] minValueOfEachPage = new byte[numberOfPages][];
+      int[] eachPageLength = new int[numberOfPages];
+      for (int j = 0; j < minValueOfEachPage.length; j++) {
+        maxValueOfEachPage[j] =
+            dataChunk.getData_chunk_list().get(j).getMin_max().getMax_values().get(0).array();
+        minValueOfEachPage[j] =
+            dataChunk.getData_chunk_list().get(j).getMin_max().getMin_values().get(0).array();
+        eachPageLength[j] = dataChunk.getData_chunk_list().get(j).getNumberOfRowsInpage();
+      }
+      dimensionDataChunks[index].setDataChunkV3(dataChunk);
+      dimensionDataChunks[index].setFileHolder(fileReader);
+      dimensionDataChunks[index].setPagesCount(dataChunk.getPage_length().size());
+      dimensionDataChunks[index].setMaxValues(maxValueOfEachPage);
+      dimensionDataChunks[index].setMinValues(minValueOfEachPage);
+      dimensionDataChunks[index].setRowCount(eachPageLength);
+      dimensionDataChunks[index].setLengths(ArrayUtils
+          .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+      dimensionDataChunks[index].setOffsets(ArrayUtils
+          .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+      runningLength += currentLength;
+      index++;
+    }
+    return dimensionDataChunks;
+  }
+
+  /**
+   * Below method will be used to convert the compressed dimension chunk raw data to actual data
+   *
+   * @param dimensionRawColumnChunk dimension raw chunk
+   * @param page                    number
+   * @return DimensionColumnDataChunk
+   */
+  @Override public DimensionColumnDataChunk convertToDimensionChunk(
+      DimensionRawColumnChunk dimensionRawColumnChunk, int pageNumber) throws IOException {
+    byte[] dataPage = null;
+    int[] invertedIndexes = null;
+    int[] invertedIndexesReverse = null;
+    int[] rlePage = null;
+    // data chunk of page
+    DataChunk2 dimensionColumnChunk = null;
+    // data chunk of blocklet column
+    DataChunk3 dataChunk3 = dimensionRawColumnChunk.getDataChunkV3();
+    // get the data buffer
+    ByteBuffer rawData = dimensionRawColumnChunk.getRawData();
+    dimensionColumnChunk = dataChunk3.getData_chunk_list().get(pageNumber);
+    // calculating the start point of data
+    // as buffer can contain multiple column data, start point will be datachunkoffset +
+    // data chunk length + page offset
+    int copySourcePoint = dimensionRawColumnChunk.getOffSet() + dimensionChunksLength
+        .get(dimensionRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
+    byte[] data = new byte[dimensionColumnChunk.data_page_length];
+    rawData.position(copySourcePoint);
+    rawData.get(data);
+    // first read the data and uncompressed it
+    dataPage = COMPRESSOR.unCompressByte(data, 0, dimensionColumnChunk.data_page_length);
+    copySourcePoint += dimensionColumnChunk.data_page_length;
+    // if row id block is present then read the row id chunk and uncompress it
+    if (hasEncoding(dimensionColumnChunk.encoders, Encoding.INVERTED_INDEX)) {
+      invertedIndexes = CarbonUtil
+          .getUnCompressColumnIndex(dimensionColumnChunk.rowid_page_length, rawData,
+              copySourcePoint);
+      copySourcePoint += dimensionColumnChunk.rowid_page_length;
+      // get the reverse index
+      invertedIndexesReverse = getInvertedReverseIndex(invertedIndexes);
+    }
+    // if rle is applied then read the rle block chunk and then uncompress
+    //then actual data based on rle block
+    if (hasEncoding(dimensionColumnChunk.encoders, Encoding.RLE)) {
+      rlePage =
+          CarbonUtil.getIntArray(rawData, copySourcePoint, dimensionColumnChunk.rle_page_length);
+      // uncompress the data with rle indexes
+      dataPage = UnBlockIndexer.uncompressData(dataPage, rlePage,
+          eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()]);
+      rlePage = null;
+    }
+    // fill chunk attributes
+    DimensionColumnDataChunk columnDataChunk = null;
+
+    if (dimensionColumnChunk.isRowMajor()) {
+      // to store fixed length column chunk values
+      columnDataChunk = new ColumnGroupDimensionDataChunk(dataPage,
+          eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()],
+          dimensionRawColumnChunk.getRowCount()[pageNumber]);
+    }
+    // if no dictionary column then first create a no dictionary column chunk
+    // and set to data chunk instance
+    else if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
+      columnDataChunk =
+          new VariableLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+              dimensionRawColumnChunk.getRowCount()[pageNumber]);
+    } else {
+      // to store fixed length column chunk values
+      columnDataChunk =
+          new FixedLengthDimensionDataChunk(dataPage, invertedIndexes, invertedIndexesReverse,
+              dimensionRawColumnChunk.getRowCount()[pageNumber],
+              eachColumnValueSize[dimensionRawColumnChunk.getBlockletId()]);
+    }
+    return columnDataChunk;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
new file mode 100644
index 0000000..a94d08b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
+
+/**
+ * Abstract class for V2, V3 format measure column reader
+ */
+public abstract class AbstractMeasureChunkReaderV2V3Format extends AbstractMeasureChunkReader {
+
+  /**
+   * measure column chunks offset
+   */
+  protected List<Long> measureColumnChunkOffsets;
+
+  /**
+   * measure column chunks length
+   */
+  protected List<Integer> measureColumnChunkLength;
+
+  public AbstractMeasureChunkReaderV2V3Format(final BlockletInfo blockletInfo,
+      final String filePath) {
+    super(filePath, blockletInfo.getNumberOfRows());
+    this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
+    this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
+  }
+
+  /**
+   * Below method will be used to read the chunk based on block indexes
+   * Reading logic of below method is: Except last column all the column chunk
+   * can be read in group if not last column then read data of all the column
+   * present in block index together then process it. For last column read is
+   * separately and process
+   *
+   * @param fileReader   file reader to read the blocks from file
+   * @param blockIndexes blocks range to be read
+   * @return measure column chunks
+   * @throws IOException
+   */
+  public MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
+      throws IOException {
+    // read the column chunk based on block index and add
+    MeasureRawColumnChunk[] dataChunks =
+        new MeasureRawColumnChunk[measureColumnChunkOffsets.size()];
+    if (blockIndexes.length == 0) {
+      return dataChunks;
+    }
+    MeasureRawColumnChunk[] groupChunk = null;
+    int index = 0;
+    for (int i = 0; i < blockIndexes.length - 1; i++) {
+      index = 0;
+      groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
+      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
+      dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
+          readRawMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
+    } else {
+      groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
+          blockIndexes[blockIndexes.length - 1][1]);
+      index = 0;
+      for (int j = blockIndexes[blockIndexes.length - 1][0];
+           j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
+        dataChunks[j] = groupChunk[index++];
+      }
+    }
+    return dataChunks;
+  }
+
+  /**
+   * Below method will be used to convert the thrift presence meta to wrapper
+   * presence meta
+   *
+   * @param presentMetadataThrift
+   * @return wrapper presence meta
+   */
+  protected PresenceMeta getPresenceMeta(
+      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
+    PresenceMeta presenceMeta = new PresenceMeta();
+    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
+    presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
+        .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
+    return presenceMeta;
+  }
+
+  /**
+   * Below method will be used to read measure chunk data in group.
+   * This method will be useful to avoid multiple IO while reading the
+   * data from
+   *
+   * @param fileReader               file reader to read the data
+   * @param startColumnBlockletIndex first column blocklet index to be read
+   * @param endColumnBlockletIndex   end column blocklet index to be read
+   * @return measure raw chunkArray
+   * @throws IOException
+   */
+  protected abstract MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+      int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 7ac1578..7b6acee 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -19,37 +19,24 @@ package org.apache.carbondata.core.datastore.chunk.reader.measure.v2;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.List;
 
 import org.apache.carbondata.core.datastore.FileHolder;
 import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
-import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReader;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
-import org.apache.carbondata.core.metadata.blocklet.datachunk.PresenceMeta;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.format.DataChunk2;
 
 /**
  * Class to read the measure column data for version 2
  */
-public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChunkReader {
-
-  /**
-   * measure column chunks offset
-   */
-  private List<Long> measureColumnChunkOffsets;
-
-  /**
-   * measure column chunks length
-   */
-  private List<Integer> measureColumnChunkLength;
+public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChunkReaderV2V3Format {
 
   /**
    * Constructor to get minimum parameter to create instance of this class
@@ -59,69 +46,7 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
    */
   public CompressedMeasureChunkFileBasedReaderV2(final BlockletInfo blockletInfo,
       final String filePath) {
-    super(filePath, blockletInfo.getNumberOfRows());
-    this.measureColumnChunkOffsets = blockletInfo.getMeasureChunkOffsets();
-    this.measureColumnChunkLength = blockletInfo.getMeasureChunksLength();
-  }
-
-  /**
-   * Below method will be used to convert the thrift presence meta to wrapper
-   * presence meta
-   *
-   * @param presentMetadataThrift
-   * @return wrapper presence meta
-   */
-  private static PresenceMeta getPresenceMeta(
-      org.apache.carbondata.format.PresenceMeta presentMetadataThrift) {
-    PresenceMeta presenceMeta = new PresenceMeta();
-    presenceMeta.setRepresentNullValues(presentMetadataThrift.isRepresents_presence());
-    presenceMeta.setBitSet(BitSet.valueOf(CompressorFactory.getInstance().getCompressor()
-        .unCompressByte(presentMetadataThrift.getPresent_bit_stream())));
-    return presenceMeta;
-  }
-
-  /**
-   * Below method will be used to read the chunk based on block indexes
-   * Reading logic of below method is: Except last column all the column chunk
-   * can be read in group if not last column then read data of all the column
-   * present in block index together then process it. For last column read is
-   * separately and process
-   *
-   * @param fileReader   file reader to read the blocks from file
-   * @param blockIndexes blocks range to be read
-   * @return measure column chunks
-   * @throws IOException
-   */
-  public MeasureRawColumnChunk[] readRawMeasureChunks(FileHolder fileReader, int[][] blockIndexes)
-      throws IOException {
-    // read the column chunk based on block index and add
-    MeasureRawColumnChunk[] dataChunks =
-        new MeasureRawColumnChunk[measureColumnChunkOffsets.size()];
-    if (blockIndexes.length == 0) {
-      return dataChunks;
-    }
-    MeasureRawColumnChunk[] groupChunk = null;
-    int index = 0;
-    for (int i = 0; i < blockIndexes.length - 1; i++) {
-      index = 0;
-      groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[i][0], blockIndexes[i][1]);
-      for (int j = blockIndexes[i][0]; j <= blockIndexes[i][1]; j++) {
-        dataChunks[j] = groupChunk[index++];
-      }
-    }
-    if (blockIndexes[blockIndexes.length - 1][0] == measureColumnChunkOffsets.size() - 1) {
-      dataChunks[blockIndexes[blockIndexes.length - 1][0]] =
-          readRawMeasureChunk(fileReader, blockIndexes[blockIndexes.length - 1][0]);
-    } else {
-      groupChunk = readRawMeasureChunksInGroup(fileReader, blockIndexes[blockIndexes.length - 1][0],
-          blockIndexes[blockIndexes.length - 1][1]);
-      index = 0;
-      for (int j = blockIndexes[blockIndexes.length - 1][0];
-           j <= blockIndexes[blockIndexes.length - 1][1]; j++) {
-        dataChunks[j] = groupChunk[index++];
-      }
-    }
-    return dataChunks;
+    super(blockletInfo, filePath);
   }
 
   @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
@@ -146,20 +71,31 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     return rawColumnChunk;
   }
 
-  private MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
-      int startBlockIndex, int endBlockIndex) throws IOException {
-    long currentMeasureOffset = measureColumnChunkOffsets.get(startBlockIndex);
+  /**
+   * Below method will be used to read measure chunk data in group.
+   * This method will be useful to avoid multiple IO while reading the
+   * data from
+   *
+   * @param fileReader               file reader to read the data
+   * @param startColumnBlockletIndex first column blocklet index to be read
+   * @param endColumnBlockletIndex   end column blocklet index to be read
+   * @return measure raw chunkArray
+   * @throws IOException
+   */
+  protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+      int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+    long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex);
     ByteBuffer buffer = ByteBuffer.allocateDirect(
-        (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+        (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
     synchronized (fileReader) {
       fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
-          (int) (measureColumnChunkOffsets.get(endBlockIndex + 1) - currentMeasureOffset));
+          (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
     }
     MeasureRawColumnChunk[] dataChunks =
-        new MeasureRawColumnChunk[endBlockIndex - startBlockIndex + 1];
+        new MeasureRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
     int runningLength = 0;
     int index = 0;
-    for (int i = startBlockIndex; i <= endBlockIndex; i++) {
+    for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
       int currentLength =
           (int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
       MeasureRawColumnChunk measureRawColumnChunk =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
new file mode 100644
index 0000000..307af41
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.chunk.reader.measure.v3;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.FileHolder;
+import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
+import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
+
+import org.apache.commons.lang.ArrayUtils;
+
+/**
+ * Measure column V3 Reader class which will be used to read and uncompress
+ * V3 format data
+ * data format
+ * Data Format
+ * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>>
+ * <Column2 Data ChunkV3><Column2<Page1><Page2><Page3><Page4>>
+ * <Column3 Data ChunkV3><Column3<Page1><Page2><Page3><Page4>>
+ * <Column4 Data ChunkV3><Column4<Page1><Page2><Page3><Page4>>
+ */
+public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChunkReaderV2V3Format {
+
+  /**
+   * end position of last measure in carbon data file
+   */
+  private long measureOffsets;
+
+  public CompressedMeasureChunkFileBasedReaderV3(BlockletInfo blockletInfo, String filePath) {
+    super(blockletInfo, filePath);
+    measureOffsets = blockletInfo.getMeasureOffsets();
+  }
+
+  /**
+   * Below method will be used to read the measure column data form carbon data file
+   * 1. Get the length of the data to be read
+   * 2. Allocate the direct buffer
+   * 3. read the data from file
+   * 4. Get the data chunk object from data read
+   * 5. Create the raw chunk object and fill the details
+   *
+   * @param fileReader          reader for reading the column from carbon data file
+   * @param blockletColumnIndex blocklet index of the column in carbon data file
+   * @return measure raw chunk
+   */
+  @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader, int blockIndex)
+      throws IOException {
+    int dataLength = 0;
+    // to calculate the length of the data to be read
+    // column other than last column we can subtract the offset of current column with
+    // next column and get the total length.
+    // but for last column we need to use lastDimensionOffset which is the end position
+    // of the last dimension, we can subtract current dimension offset from lastDimesionOffset
+    if (measureColumnChunkOffsets.size() - 1 == blockIndex) {
+      dataLength = (int) (measureOffsets - measureColumnChunkOffsets.get(blockIndex));
+    } else {
+      dataLength = (int) (measureColumnChunkOffsets.get(blockIndex + 1) - measureColumnChunkOffsets
+          .get(blockIndex));
+    }
+    // allocate the buffer
+    ByteBuffer buffer = ByteBuffer.allocateDirect(dataLength);
+    // read the data from carbon data file
+    synchronized (fileReader) {
+      fileReader
+          .readByteBuffer(filePath, buffer, measureColumnChunkOffsets.get(blockIndex), dataLength);
+    }
+    // get the data chunk which will have all the details about the data pages
+    DataChunk3 dataChunk =
+        CarbonUtil.readDataChunk3(buffer, 0, measureColumnChunkLength.get(blockIndex));
+    // creating a raw chunks instance and filling all the details
+    MeasureRawColumnChunk rawColumnChunk =
+        new MeasureRawColumnChunk(blockIndex, buffer, 0, dataLength, this);
+    int numberOfPages = dataChunk.getPage_length().size();
+    byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+    byte[][] minValueOfEachPage = new byte[numberOfPages][];
+    int[] eachPageLength = new int[numberOfPages];
+    for (int i = 0; i < minValueOfEachPage.length; i++) {
+      maxValueOfEachPage[i] =
+          dataChunk.getData_chunk_list().get(i).getMin_max().getMax_values().get(0).array();
+      minValueOfEachPage[i] =
+          dataChunk.getData_chunk_list().get(i).getMin_max().getMin_values().get(0).array();
+      eachPageLength[i] = dataChunk.getData_chunk_list().get(i).getNumberOfRowsInpage();
+    }
+    rawColumnChunk.setDataChunkV3(dataChunk);
+    rawColumnChunk.setFileReader(fileReader);
+    rawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+    rawColumnChunk.setMaxValues(maxValueOfEachPage);
+    rawColumnChunk.setMinValues(minValueOfEachPage);
+    rawColumnChunk.setRowCount(eachPageLength);
+    rawColumnChunk.setLengths(ArrayUtils
+        .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+    rawColumnChunk.setOffsets(ArrayUtils
+        .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+    return rawColumnChunk;
+  }
+
+  /**
+   * Below method will be used to read the multiple measure column data in group
+   * and divide into measure raw chunk object
+   * Steps for reading
+   * 1. Get the length of the data to be read
+   * 2. Allocate the direct buffer
+   * 3. read the data from file
+   * 4. Get the data chunk object from file for each column
+   * 5. Create the raw chunk object and fill the details for each column
+   * 6. increment the offset of the data
+   *
+   * @param fileReader
+   *        reader which will be used to read the measure columns data from file
+   * @param startBlockletColumnIndex
+   *        blocklet index of the first measure column
+   * @param endBlockletColumnIndex
+   *        blocklet index of the last measure column
+   * @return MeasureRawColumnChunk array
+   */
+  protected MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
+      int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException {
+    // to calculate the length of the data to be read
+    // column we can subtract the offset of start column offset with
+    // end column+1 offset and get the total length.
+    long currentMeasureOffset = measureColumnChunkOffsets.get(startColumnBlockletIndex);
+    ByteBuffer buffer = ByteBuffer.allocateDirect(
+        (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
+    // read the data from carbon data file
+    synchronized (fileReader) {
+      fileReader.readByteBuffer(filePath, buffer, currentMeasureOffset,
+          (int) (measureColumnChunkOffsets.get(endColumnBlockletIndex + 1) - currentMeasureOffset));
+    }
+    // create raw chunk for each measure column
+    MeasureRawColumnChunk[] measureDataChunk =
+        new MeasureRawColumnChunk[endColumnBlockletIndex - startColumnBlockletIndex + 1];
+    int runningLength = 0;
+    int index = 0;
+    for (int i = startColumnBlockletIndex; i <= endColumnBlockletIndex; i++) {
+      int currentLength =
+          (int) (measureColumnChunkOffsets.get(i + 1) - measureColumnChunkOffsets.get(i));
+      MeasureRawColumnChunk measureRawColumnChunk =
+          new MeasureRawColumnChunk(i, buffer, runningLength, currentLength, this);
+      DataChunk3 dataChunk =
+          CarbonUtil.readDataChunk3(buffer, runningLength, measureColumnChunkLength.get(i));
+
+      int numberOfPages = dataChunk.getPage_length().size();
+      byte[][] maxValueOfEachPage = new byte[numberOfPages][];
+      byte[][] minValueOfEachPage = new byte[numberOfPages][];
+      int[] eachPageLength = new int[numberOfPages];
+      for (int j = 0; j < minValueOfEachPage.length; j++) {
+        maxValueOfEachPage[j] =
+            dataChunk.getData_chunk_list().get(j).getMin_max().getMax_values().get(0).array();
+        minValueOfEachPage[j] =
+            dataChunk.getData_chunk_list().get(j).getMin_max().getMin_values().get(0).array();
+        eachPageLength[j] = dataChunk.getData_chunk_list().get(j).getNumberOfRowsInpage();
+      }
+      measureRawColumnChunk.setDataChunkV3(dataChunk);
+      ;
+      measureRawColumnChunk.setFileReader(fileReader);
+      measureRawColumnChunk.setPagesCount(dataChunk.getPage_length().size());
+      measureRawColumnChunk.setMaxValues(maxValueOfEachPage);
+      measureRawColumnChunk.setMinValues(minValueOfEachPage);
+      measureRawColumnChunk.setRowCount(eachPageLength);
+      measureRawColumnChunk.setLengths(ArrayUtils
+          .toPrimitive(dataChunk.page_length.toArray(new Integer[dataChunk.page_length.size()])));
+      measureRawColumnChunk.setOffsets(ArrayUtils
+          .toPrimitive(dataChunk.page_offset.toArray(new Integer[dataChunk.page_offset.size()])));
+      measureDataChunk[index] = measureRawColumnChunk;
+      runningLength += currentLength;
+      index++;
+    }
+    return measureDataChunk;
+  }
+
+  /**
+   * Below method will be used to convert the compressed measure chunk raw data to actual data
+   *
+   * @param measureRawColumnChunk measure raw chunk
+   * @param page                  number
+   * @return DimensionColumnDataChunk
+   */
+  @Override public MeasureColumnDataChunk convertToMeasureChunk(
+      MeasureRawColumnChunk measureRawColumnChunk, int pageNumber) throws IOException {
+    MeasureColumnDataChunk datChunk = new MeasureColumnDataChunk();
+    // data chunk of blocklet column
+    DataChunk3 dataChunk3 = measureRawColumnChunk.getDataChunkV3();
+    // data chunk of page
+    DataChunk2 measureColumnChunk = dataChunk3.getData_chunk_list().get(pageNumber);
+    // calculating the start point of data
+    // as buffer can contain multiple column data, start point will be datachunkoffset +
+    // data chunk length + page offset
+    int copyPoint = measureRawColumnChunk.getOffSet() + measureColumnChunkLength
+        .get(measureRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
+    List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
+    for (int i = 0; i < measureColumnChunk.getEncoder_meta().size(); i++) {
+      valueEncodeMeta.add(CarbonUtil
+          .deserializeEncoderMetaNew(measureColumnChunk.getEncoder_meta().get(i).array()));
+    }
+    WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
+    ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
+    // uncompress
+    byte[] data = new byte[measureColumnChunk.data_page_length];
+    ByteBuffer rawData = measureRawColumnChunk.getRawData();
+    rawData.position(copyPoint);
+    rawData.get(data);
+    values.uncompress(compressionModel.getConvertedDataType()[0], data, 0,
+        measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
+        compressionModel.getMaxValue()[0], measureRawColumnChunk.getRowCount()[pageNumber]);
+    CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
+    // set the data chunk
+    datChunk.setMeasureDataHolder(measureDataHolder);
+    // set the null value indexes
+    datChunk.setNullValueIndexHolder(getPresenceMeta(measureColumnChunk.presence));
+    return datChunk;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
index 14e7938..23af707 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeFixedLengthDimensionDataChunkStore.java
@@ -18,6 +18,7 @@
 package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
 
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
 
 /**
  * Below class will be used to store fixed length dimension data
@@ -66,13 +67,7 @@ public class SafeFixedLengthDimensionDataChunkStore extends SafeAbsractDimension
     }
     // below part is to convert the byte array to surrogate value
     int startOffsetOfData = index * columnValueSize;
-    int surrogate = 0;
-    for (int i = 0; i < columnValueSize; i++) {
-      surrogate <<= 8;
-      surrogate ^= data[startOffsetOfData] & 0xFF;
-      startOffsetOfData++;
-    }
-    return surrogate;
+    return CarbonUtil.getSurrogateInternal(data, startOffsetOfData, columnValueSize);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
new file mode 100644
index 0000000..346d8d8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForShort.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class BlockIndexerStorageForShort implements IndexStorage<short[]> {
+
+  private boolean alreadySorted;
+
+  private short[] dataAfterComp;
+
+  private short[] indexMap;
+
+  private byte[][] keyBlock;
+
+  private short[] dataIndexMap;
+
+  private int totalSize;
+
+  public BlockIndexerStorageForShort(byte[][] keyBlock, boolean compressData,
+      boolean isNoDictionary, boolean isSortRequired) {
+    ColumnWithShortIndex[] columnWithIndexs = createColumnWithIndexArray(keyBlock, isNoDictionary);
+    if (isSortRequired) {
+      Arrays.sort(columnWithIndexs);
+    }
+    compressMyOwnWay(extractDataAndReturnIndexes(columnWithIndexs, keyBlock));
+    if (compressData) {
+      compressDataMyOwnWay(columnWithIndexs);
+    }
+  }
+
+  /**
+   * Create an object with each column array and respective index
+   *
+   * @return
+   */
+  private ColumnWithShortIndex[] createColumnWithIndexArray(byte[][] keyBlock,
+      boolean isNoDictionary) {
+    ColumnWithShortIndex[] columnWithIndexs;
+    if (isNoDictionary) {
+      columnWithIndexs = new ColumnWithShortIndex[keyBlock.length];
+      for (short i = 0; i < columnWithIndexs.length; i++) {
+        columnWithIndexs[i] = new ColumnWithShortIndexForNoDictionay(keyBlock[i], i);
+      }
+    } else {
+      columnWithIndexs = new ColumnWithShortIndex[keyBlock.length];
+      for (short i = 0; i < columnWithIndexs.length; i++) {
+        columnWithIndexs[i] = new ColumnWithShortIndex(keyBlock[i], i);
+      }
+    }
+    return columnWithIndexs;
+  }
+
+  private short[] extractDataAndReturnIndexes(ColumnWithShortIndex[] columnWithIndexs,
+      byte[][] keyBlock) {
+    short[] indexes = new short[columnWithIndexs.length];
+    for (int i = 0; i < indexes.length; i++) {
+      indexes[i] = columnWithIndexs[i].getIndex();
+      keyBlock[i] = columnWithIndexs[i].getColumn();
+    }
+    this.keyBlock = keyBlock;
+    return indexes;
+  }
+
+  /**
+   * It compresses depends up on the sequence numbers.
+   * [1,2,3,4,6,8,10,11,12,13] is translated to [1,4,6,8,10,13] and [0,6]. In
+   * first array the start and end of sequential numbers and second array
+   * keeps the indexes of where sequential numbers starts. If there is no
+   * sequential numbers then the same array it returns with empty second
+   * array.
+   *
+   * @param indexes
+   */
+  public void compressMyOwnWay(short[] indexes) {
+    List<Short> list = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    int k = 0;
+    int i = 1;
+    for (; i < indexes.length; i++) {
+      if (indexes[i] - indexes[i - 1] == 1) {
+        k++;
+      } else {
+        if (k > 0) {
+          map.add(((short) list.size()));
+          list.add(indexes[i - k - 1]);
+          list.add(indexes[i - 1]);
+        } else {
+          list.add(indexes[i - 1]);
+        }
+        k = 0;
+      }
+    }
+    if (k > 0) {
+      map.add(((short) list.size()));
+      list.add(indexes[i - k - 1]);
+      list.add(indexes[i - 1]);
+    } else {
+      list.add(indexes[i - 1]);
+    }
+    double compressionPercentage = (((list.size() + map.size()) * 100) / indexes.length);
+    if (compressionPercentage > 70) {
+      dataAfterComp = indexes;
+    } else {
+      dataAfterComp = convertToArray(list);
+    }
+    if (indexes.length == dataAfterComp.length) {
+      indexMap = new short[0];
+    } else {
+      indexMap = convertToArray(map);
+    }
+    if (dataAfterComp.length == 2 && indexMap.length == 1) {
+      alreadySorted = true;
+    }
+  }
+
+  private short[] convertToArray(List<Short> list) {
+    short[] shortArray = new short[list.size()];
+    for (int i = 0; i < shortArray.length; i++) {
+      shortArray[i] = list.get(i);
+    }
+    return shortArray;
+  }
+
+  /**
+   * @return the alreadySorted
+   */
+  public boolean isAlreadySorted() {
+    return alreadySorted;
+  }
+
+  /**
+   * @return the dataAfterComp
+   */
+  public short[] getDataAfterComp() {
+    return dataAfterComp;
+  }
+
+  /**
+   * @return the indexMap
+   */
+  public short[] getIndexMap() {
+    return indexMap;
+  }
+
+  /**
+   * @return the keyBlock
+   */
+  public byte[][] getKeyBlock() {
+    return keyBlock;
+  }
+
+  private void compressDataMyOwnWay(ColumnWithShortIndex[] indexes) {
+    byte[] prvKey = indexes[0].getColumn();
+    List<ColumnWithShortIndex> list =
+        new ArrayList<ColumnWithShortIndex>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    list.add(indexes[0]);
+    short counter = 1;
+    short start = 0;
+    List<Short> map = new ArrayList<Short>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    for (int i = 1; i < indexes.length; i++) {
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(prvKey, indexes[i].getColumn()) != 0) {
+        prvKey = indexes[i].getColumn();
+        list.add(indexes[i]);
+        map.add(start);
+        map.add(counter);
+        start += counter;
+        counter = 1;
+        continue;
+      }
+      counter++;
+    }
+    map.add(start);
+    map.add(counter);
+    this.keyBlock = convertToKeyArray(list);
+    if (indexes.length == keyBlock.length) {
+      dataIndexMap = new short[0];
+    } else {
+      dataIndexMap = convertToArray(map);
+    }
+  }
+
+  private byte[][] convertToKeyArray(List<ColumnWithShortIndex> list) {
+    byte[][] shortArray = new byte[list.size()][];
+    for (int i = 0; i < shortArray.length; i++) {
+      shortArray[i] = list.get(i).getColumn();
+      totalSize += shortArray[i].length;
+    }
+    return shortArray;
+  }
+
+  @Override public short[] getDataIndexMap() {
+    return dataIndexMap;
+  }
+
+  @Override public int getTotalSize() {
+    return totalSize;
+  }
+
+  @Override public byte[] getMin() {
+    return keyBlock[0];
+  }
+
+  @Override public byte[] getMax() {
+    return keyBlock[keyBlock.length - 1];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
new file mode 100644
index 0000000..57447a3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndex.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class ColumnWithShortIndex implements Comparable<ColumnWithShortIndex> {
+  protected byte[] column;
+
+  private short index;
+
+  public ColumnWithShortIndex(byte[] column, short index) {
+    this.column = column;
+    this.index = index;
+  }
+
+  /**
+   * @return the column
+   */
+  public byte[] getColumn() {
+    return column;
+  }
+
+  /**
+   * @param column the column to set
+   */
+  public void setColumn(byte[] column) {
+    this.column = column;
+  }
+
+  /**
+   * @return the index
+   */
+  public short getIndex() {
+    return index;
+  }
+
+  /**
+   * @param index the index to set
+   */
+  public void setIndex(short index) {
+    this.index = index;
+  }
+
+  @Override public int compareTo(ColumnWithShortIndex o) {
+    return ByteUtil.UnsafeComparer.INSTANCE.compareTo(column, o.column);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    ColumnWithShortIndex o = (ColumnWithShortIndex)obj;
+    return Arrays.equals(column, o.column) && index == o.index;
+  }
+
+  @Override public int hashCode() {
+    return Arrays.hashCode(column) + index;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
new file mode 100644
index 0000000..34cce63
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/ColumnWithShortIndexForNoDictionay.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.Arrays;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+public class ColumnWithShortIndexForNoDictionay extends ColumnWithShortIndex
+    implements Comparable<ColumnWithShortIndex> {
+
+  public ColumnWithShortIndexForNoDictionay(byte[] column, short index) {
+    super(column, index);
+  }
+
+  @Override public int compareTo(ColumnWithShortIndex o) {
+    return UnsafeComparer.INSTANCE
+        .compareTo(column, 2, column.length - 2, o.column, 2, o.column.length - 2);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    ColumnWithIntIndexForHighCard o = (ColumnWithIntIndexForHighCard) obj;
+    return Arrays.equals(column, o.column) && getIndex() == o.getIndex();
+  }
+
+  @Override public int hashCode() {
+    return Arrays.hashCode(column) + getIndex();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
index 7629895..240f891 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnarFormatVersion.java
@@ -19,7 +19,8 @@ package org.apache.carbondata.core.metadata;
 
 public enum ColumnarFormatVersion {
   V1((short)1),
-  V2((short)2);
+  V2((short)2),
+  V3((short)3);
 
   private short version;
   ColumnarFormatVersion(short version) {
@@ -43,8 +44,12 @@ public enum ColumnarFormatVersion {
       case 1:
         // after multiple reader support, user can write new file with version 1
         return V1;
-      default:
+      case 2:
         return V2;
+      case 3:
+        return V3;
+      default:
+        return V3;
     }
   }
 }