You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/26 14:54:30 UTC

[2/4] incubator-carbondata git commit: Added V3 Format Writer and Reader Code

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 0c2e8ab..1e8207c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.common.logging.impl.StandardLogService;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.BlockIndexStore;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
@@ -60,6 +61,7 @@ import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.stats.QueryStatistic;
 import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
@@ -336,6 +338,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
     int[] dimensionsBlockIndexes = QueryUtil.getDimensionsBlockIndexes(updatedQueryDimension,
         segmentProperties.getDimensionOrdinalToBlockMapping(), expressionDimensions,
         queryProperties.complexFilterDimension, allProjectionListDimensionIdexes);
+    int numberOfColumnToBeReadInOneIO = Integer.parseInt(CarbonProperties.getInstance()
+        .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+            CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE));
+
     if (dimensionsBlockIndexes.length > 0) {
       numberOfElementToConsider = dimensionsBlockIndexes[dimensionsBlockIndexes.length - 1]
           == segmentProperties.getBlockTodimensionOrdinalMapping().size() - 1 ?
@@ -343,7 +349,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
           dimensionsBlockIndexes.length;
       blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(CarbonUtil
           .getRangeIndex(dimensionsBlockIndexes, numberOfElementToConsider,
-              CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO));
+              numberOfColumnToBeReadInOneIO));
     } else {
       blockExecutionInfo.setAllSelectedDimensionBlocksIndexes(new int[0][0]);
     }
@@ -362,7 +368,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
       // setting all the measure chunk indexes to be read from file
       blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(CarbonUtil
           .getRangeIndex(measureBlockIndexes, numberOfElementToConsider,
-              CarbonCommonConstants.NUMBER_OF_COLUMN_READ_IN_IO));
+              numberOfColumnToBeReadInOneIO));
     } else {
       blockExecutionInfo.setAllSelectedMeasureBlocksIndexes(new int[0][0]);
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
index d2523a1..c64f498 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/IncludeColGroupFilterExecuterImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.core.scan.filter.executer;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
@@ -24,11 +25,14 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.core.scan.executor.util.QueryUtil;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import org.apache.carbondata.core.scan.processor.BlocksChunkHolder;
+import org.apache.carbondata.core.util.BitSetGroup;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -80,6 +84,26 @@ public class IncludeColGroupFilterExecuterImpl extends IncludeFilterExecuterImpl
     return bitSet;
   }
 
+  @Override public BitSetGroup applyFilter(BlocksChunkHolder blockChunkHolder) throws IOException {
+    int blockIndex = segmentProperties.getDimensionOrdinalToBlockMapping()
+        .get(dimColumnEvaluatorInfo.getColumnIndex());
+    if (null == blockChunkHolder.getDimensionRawDataChunk()[blockIndex]) {
+      blockChunkHolder.getDimensionRawDataChunk()[blockIndex] = blockChunkHolder.getDataBlock()
+          .getDimensionChunk(blockChunkHolder.getFileReader(), blockIndex);
+    }
+    DimensionRawColumnChunk dimensionRawColumnChunk =
+        blockChunkHolder.getDimensionRawDataChunk()[blockIndex];
+    BitSetGroup bitSetGroup = new BitSetGroup(dimensionRawColumnChunk.getPagesCount());
+    for (int i = 0; i < dimensionRawColumnChunk.getPagesCount(); i++) {
+      if (dimensionRawColumnChunk.getMaxValues() != null) {
+        BitSet bitSet = getFilteredIndexes(dimensionRawColumnChunk.convertToDimColDataChunk(i),
+            dimensionRawColumnChunk.getRowCount()[i]);
+        bitSetGroup.setBitSet(bitSet, i);
+      }
+    }
+    return bitSetGroup;
+  }
+
   /**
    * It is required for extracting column data from columngroup chunk
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 2bdce8d..c7e2acc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -96,17 +96,9 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl extends RowLevelFilter
     for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
       if (rawColumnChunk.getMinValues() != null) {
         if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
-          int compare = ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
-          if (compare >= 0) {
-            BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
-            bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
-            bitSetGroup.setBitSet(bitSet, i);
-          } else {
-            BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
-                rawColumnChunk.getRowCount()[i]);
-            bitSetGroup.setBitSet(bitSet, i);
-          }
+          BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+              rawColumnChunk.getRowCount()[i]);
+          bitSetGroup.setBitSet(bitSet, i);
         }
       } else {
         BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index ae9ba8a..d9795eb 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -96,17 +96,9 @@ public class RowLevelRangeLessThanFiterExecuterImpl extends RowLevelFilterExecut
     for (int i = 0; i < rawColumnChunk.getPagesCount(); i++) {
       if (rawColumnChunk.getMinValues() != null) {
         if (isScanRequired(rawColumnChunk.getMinValues()[i], this.filterRangeValues)) {
-          int compare = ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(filterRangeValues[0], rawColumnChunk.getMaxValues()[i]);
-          if (compare > 0) {
-            BitSet bitSet = new BitSet(rawColumnChunk.getRowCount()[i]);
-            bitSet.flip(0, rawColumnChunk.getRowCount()[i]);
-            bitSetGroup.setBitSet(bitSet, i);
-          } else {
-            BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
-                rawColumnChunk.getRowCount()[i]);
-            bitSetGroup.setBitSet(bitSet, i);
-          }
+          BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),
+              rawColumnChunk.getRowCount()[i]);
+          bitSetGroup.setBitSet(bitSet, i);
         }
       } else {
         BitSet bitSet = getFilteredIndexes(rawColumnChunk.convertToDimColDataChunk(i),

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
index 07e3487..323042a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/BitSetGroup.java
@@ -52,9 +52,11 @@ public class BitSetGroup {
   public void and(BitSetGroup group) {
     int i = 0;
     for (BitSet bitSet : bitSets) {
-      BitSet otherSet  = group.getBitSet(i);
+      BitSet otherSet = group.getBitSet(i);
       if (bitSet != null && otherSet != null) {
         bitSet.and(otherSet);
+      } else {
+        bitSets[i] = null;
       }
       i++;
     }
@@ -63,7 +65,7 @@ public class BitSetGroup {
   public void or(BitSetGroup group) {
     int i = 0;
     for (BitSet bitSet : bitSets) {
-      BitSet otherSet  = group.getBitSet(i);
+      BitSet otherSet = group.getBitSet(i);
       if (bitSet != null && otherSet != null) {
         bitSet.or(otherSet);
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 25e7cfe..55c0302 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -35,18 +36,22 @@ import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletBTreeIndex;
 import org.apache.carbondata.format.BlockletIndex;
 import org.apache.carbondata.format.BlockletInfo;
 import org.apache.carbondata.format.BlockletInfo2;
+import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.ChunkCompressionMeta;
 import org.apache.carbondata.format.ColumnSchema;
 import org.apache.carbondata.format.CompressionCodec;
 import org.apache.carbondata.format.DataChunk;
 import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
 import org.apache.carbondata.format.Encoding;
 import org.apache.carbondata.format.FileFooter;
 import org.apache.carbondata.format.IndexHeader;
@@ -109,6 +114,50 @@ public class CarbonMetadataUtil {
   }
 
   /**
+   * It converts list of BlockletInfoColumnar to FileFooter thrift objects
+   *
+   * @param infoList
+   * @param numCols
+   * @param cardinalities
+   * @return FileFooter
+   */
+  public static FileFooter convertFileFooter3(List<BlockletInfo3> infoList,
+      List<BlockletIndex> blockletIndexs, int[] cardinalities, List<ColumnSchema> columnSchemaList,
+      SegmentProperties segmentProperties) throws IOException {
+    FileFooter footer = getFileFooter3(infoList, blockletIndexs, cardinalities, columnSchemaList);
+    for (BlockletInfo3 info : infoList) {
+      footer.addToBlocklet_info_list3(info);
+    }
+    return footer;
+  }
+
+  /**
+   * Below method will be used to get the file footer object
+   *
+   * @param infoList         blocklet info
+   * @param cardinalities    cardinlaity of dimension columns
+   * @param columnSchemaList column schema list
+   * @return file footer
+   */
+  private static FileFooter getFileFooter3(List<BlockletInfo3> infoList,
+      List<BlockletIndex> blockletIndexs, int[] cardinalities,
+      List<ColumnSchema> columnSchemaList) {
+    SegmentInfo segmentInfo = new SegmentInfo();
+    segmentInfo.setNum_cols(columnSchemaList.size());
+    segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities));
+    ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+    FileFooter footer = new FileFooter();
+    footer.setVersion(version.number());
+    footer.setNum_rows(getNumberOfRowForFooter(infoList));
+    footer.setSegment_info(segmentInfo);
+    footer.setTable_columns(columnSchemaList);
+    for (BlockletIndex info : blockletIndexs) {
+      footer.addToBlocklet_index_list(info);
+    }
+    return footer;
+  }
+
+  /**
    * Below method will be used to get the file footer object for
    *
    * @param infoList         blocklet info
@@ -162,6 +211,20 @@ public class CarbonMetadataUtil {
     return numberOfRows;
   }
 
+  /**
+   * Get total number of rows for the file.
+   *
+   * @param infoList
+   * @return
+   */
+  private static long getNumberOfRowForFooter(List<BlockletInfo3> infoList) {
+    long numberOfRows = 0;
+    for (BlockletInfo3 info : infoList) {
+      numberOfRows += info.num_rows;
+    }
+    return numberOfRows;
+  }
+
   private static BlockletIndex getBlockletIndex(BlockletInfoColumnar info) {
 
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
@@ -181,9 +244,52 @@ public class CarbonMetadataUtil {
     return blockletIndex;
   }
 
+  public static BlockletIndex getBlockletIndex(List<NodeHolder> nodeHolderList,
+      List<CarbonMeasure> carbonMeasureList) {
+    BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
+    for (byte[] max : nodeHolderList.get(nodeHolderList.size() - 1).getColumnMaxData()) {
+      blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max));
+    }
+    for (byte[] min : nodeHolderList.get(0).getColumnMinData()) {
+      blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
+    }
+    byte[][] measureMaxValue = nodeHolderList.get(0).getMeasureColumnMaxData().clone();
+    byte[][] measureMinValue = nodeHolderList.get(0).getMeasureColumnMinData().clone();
+    byte[] minVal = null;
+    byte[] maxVal = null;
+    for (int i = 1; i < nodeHolderList.size(); i++) {
+      for (int j = 0; j < measureMinValue.length; j++) {
+        minVal = nodeHolderList.get(i).getMeasureColumnMinData()[j];
+        maxVal = nodeHolderList.get(i).getMeasureColumnMaxData()[j];
+        if (compareMeasureData(measureMaxValue[j], maxVal, carbonMeasureList.get(j).getDataType())
+            < 0) {
+          measureMaxValue[j] = maxVal.clone();
+        }
+        if (compareMeasureData(measureMinValue[j], minVal, carbonMeasureList.get(j).getDataType())
+            > 0) {
+          measureMinValue[j] = minVal.clone();
+        }
+      }
+    }
+
+    for (byte[] max : measureMaxValue) {
+      blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(max));
+    }
+    for (byte[] min : measureMinValue) {
+      blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
+    }
+    BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
+    blockletBTreeIndex.setStart_key(nodeHolderList.get(0).getStartKey());
+    blockletBTreeIndex.setEnd_key(nodeHolderList.get(nodeHolderList.size() - 1).getEndKey());
+    BlockletIndex blockletIndex = new BlockletIndex();
+    blockletIndex.setMin_max_index(blockletMinMaxIndex);
+    blockletIndex.setB_tree_index(blockletBTreeIndex);
+    return blockletIndex;
+  }
+
   /**
-   * Below method will be used to get the blocklet info object for
-   * data version 2 file
+   * Below method will be used to get the blocklet info object for data version
+   * 2 file
    *
    * @param blockletInfoColumnar blocklet info
    * @param dataChunkOffsets     data chunks offsets
@@ -222,7 +328,8 @@ public class CarbonMetadataUtil {
         encodings.add(Encoding.DIRECT_DICTIONARY);
       }
       dataChunk.setRowMajor(colGrpblock[i]);
-      //TODO : Once schema PR is merged and information needs to be passed here.
+      // TODO : Once schema PR is merged and information needs to be passed
+      // here.
       dataChunk.setColumn_ids(new ArrayList<Integer>());
       dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
       dataChunk.setData_page_offset(blockletInfoColumnar.getKeyOffSets()[i]);
@@ -242,7 +349,8 @@ public class CarbonMetadataUtil {
         j++;
       }
 
-      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      // TODO : Right now the encodings are happening at runtime. change as per
+      // this encoders.
       dataChunk.setEncoders(encodings);
 
       colDataChunks.add(dataChunk);
@@ -252,24 +360,26 @@ public class CarbonMetadataUtil {
       DataChunk dataChunk = new DataChunk();
       dataChunk.setChunk_meta(getChunkCompressionMeta());
       dataChunk.setRowMajor(false);
-      //TODO : Once schema PR is merged and information needs to be passed here.
+      // TODO : Once schema PR is merged and information needs to be passed
+      // here.
       dataChunk.setColumn_ids(new ArrayList<Integer>());
       dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
       dataChunk.setData_page_offset(blockletInfoColumnar.getMeasureOffset()[i]);
-      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      // TODO : Right now the encodings are happening at runtime. change as per
+      // this encoders.
       List<Encoding> encodings = new ArrayList<Encoding>();
       encodings.add(Encoding.DELTA);
       dataChunk.setEncoders(encodings);
-      //TODO writing dummy presence meta need to set actual presence
-      //meta
+      // TODO writing dummy presence meta need to set actual presence
+      // meta
       PresenceMeta presenceMeta = new PresenceMeta();
       presenceMeta.setPresent_bit_streamIsSet(true);
       presenceMeta
           .setPresent_bit_stream(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray());
       dataChunk.setPresence(presenceMeta);
-      //TODO : PresenceMeta needs to be implemented and set here
+      // TODO : PresenceMeta needs to be implemented and set here
       // dataChunk.setPresence(new PresenceMeta());
-      //TODO : Need to write ValueCompression meta here.
+      // TODO : Need to write ValueCompression meta here.
       List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
       encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
           createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
@@ -291,7 +401,7 @@ public class CarbonMetadataUtil {
   private static boolean containsEncoding(int blockIndex, Encoding encoding,
       List<ColumnSchema> columnSchemas, SegmentProperties segmentProperties) {
     Set<Integer> dimOrdinals = segmentProperties.getDimensionOrdinalForBlock(blockIndex);
-    //column groups will always have dictionary encoding
+    // column groups will always have dictionary encoding
     if (dimOrdinals.size() > 1 && Encoding.DICTIONARY == encoding) {
       return true;
     }
@@ -336,7 +446,8 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * It converts FileFooter thrift object to list of BlockletInfoColumnar objects
+   * It converts FileFooter thrift object to list of BlockletInfoColumnar
+   * objects
    *
    * @param footer
    * @return
@@ -486,8 +597,8 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * Below method will be used to get the block index info thrift object for each block
-   * present in the segment
+   * Below method will be used to get the block index info thrift object for
+   * each block present in the segment
    *
    * @param blockIndexInfoList block index info list
    * @return list of block index
@@ -508,8 +619,7 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * Below method will be used to get the data chunk object for all the
-   * columns
+   * Below method will be used to get the data chunk object for all the columns
    *
    * @param blockletInfoColumnar blocklet info
    * @param columnSchenma        list of columns
@@ -536,7 +646,8 @@ public class CarbonMetadataUtil {
         encodings.add(Encoding.DIRECT_DICTIONARY);
       }
       dataChunk.setRowMajor(colGrpblock[i]);
-      //TODO : Once schema PR is merged and information needs to be passed here.
+      // TODO : Once schema PR is merged and information needs to be passed
+      // here.
       dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
       if (aggKeyBlock[i]) {
         dataChunk.setRle_page_length(blockletInfoColumnar.getDataIndexMapLength()[aggregateIndex]);
@@ -552,7 +663,8 @@ public class CarbonMetadataUtil {
         rowIdIndex++;
       }
 
-      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      // TODO : Right now the encodings are happening at runtime. change as per
+      // this encoders.
       dataChunk.setEncoders(encodings);
 
       colDataChunks.add(dataChunk);
@@ -562,22 +674,24 @@ public class CarbonMetadataUtil {
       DataChunk2 dataChunk = new DataChunk2();
       dataChunk.setChunk_meta(getChunkCompressionMeta());
       dataChunk.setRowMajor(false);
-      //TODO : Once schema PR is merged and information needs to be passed here.
+      // TODO : Once schema PR is merged and information needs to be passed
+      // here.
       dataChunk.setData_page_length(blockletInfoColumnar.getMeasureLength()[i]);
-      //TODO : Right now the encodings are happening at runtime. change as per this encoders.
+      // TODO : Right now the encodings are happening at runtime. change as per
+      // this encoders.
       List<Encoding> encodings = new ArrayList<Encoding>();
       encodings.add(Encoding.DELTA);
       dataChunk.setEncoders(encodings);
-      //TODO writing dummy presence meta need to set actual presence
-      //meta
+      // TODO writing dummy presence meta need to set actual presence
+      // meta
       PresenceMeta presenceMeta = new PresenceMeta();
       presenceMeta.setPresent_bit_streamIsSet(true);
       presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
           .compressByte(blockletInfoColumnar.getMeasureNullValueIndex()[i].toByteArray()));
       dataChunk.setPresence(presenceMeta);
-      //TODO : PresenceMeta needs to be implemented and set here
+      // TODO : PresenceMeta needs to be implemented and set here
       // dataChunk.setPresence(new PresenceMeta());
-      //TODO : Need to write ValueCompression meta here.
+      // TODO : Need to write ValueCompression meta here.
       List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
       encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
           createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
@@ -586,4 +700,189 @@ public class CarbonMetadataUtil {
     }
     return colDataChunks;
   }
+
+  /**
+   * Below method will be used to get the data chunk object for all the columns
+   *
+   * @param blockletInfoColumnar blocklet info
+   * @param columnSchenma        list of columns
+   * @param segmentProperties    segment properties
+   * @return list of data chunks
+   * @throws IOException
+   */
+  private static List<DataChunk2> getDatachunk2(List<NodeHolder> nodeHolderList,
+      List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties, int index,
+      boolean isDimensionColumn) throws IOException {
+    List<DataChunk2> colDataChunks = new ArrayList<DataChunk2>();
+    DataChunk2 dataChunk = null;
+    NodeHolder nodeHolder = null;
+    for (int i = 0; i < nodeHolderList.size(); i++) {
+      nodeHolder = nodeHolderList.get(i);
+      dataChunk = new DataChunk2();
+      dataChunk.min_max = new BlockletMinMaxIndex();
+      dataChunk.setChunk_meta(getChunkCompressionMeta());
+      dataChunk.setNumberOfRowsInpage(nodeHolder.getEntryCount());
+      List<Encoding> encodings = new ArrayList<Encoding>();
+      if (isDimensionColumn) {
+        dataChunk.setData_page_length(nodeHolder.getKeyLengths()[index]);
+        if (containsEncoding(index, Encoding.DICTIONARY, columnSchenma, segmentProperties)) {
+          encodings.add(Encoding.DICTIONARY);
+        }
+        if (containsEncoding(index, Encoding.DIRECT_DICTIONARY, columnSchenma, segmentProperties)) {
+          encodings.add(Encoding.DIRECT_DICTIONARY);
+        }
+        dataChunk.setRowMajor(nodeHolder.getColGrpBlocks()[index]);
+        // TODO : Once schema PR is merged and information needs to be passed
+        // here.
+        if (nodeHolder.getAggBlocks()[index]) {
+          dataChunk.setRle_page_length(nodeHolder.getDataIndexMapLength()[index]);
+          encodings.add(Encoding.RLE);
+        }
+        dataChunk.setSort_state(nodeHolder.getIsSortedKeyBlock()[index] ?
+            SortState.SORT_EXPLICIT :
+            SortState.SORT_NATIVE);
+
+        if (!nodeHolder.getIsSortedKeyBlock()[index]) {
+          dataChunk.setRowid_page_length(nodeHolder.getKeyBlockIndexLength()[index]);
+          encodings.add(Encoding.INVERTED_INDEX);
+        }
+        dataChunk.min_max.addToMax_values(ByteBuffer.wrap(nodeHolder.getColumnMaxData()[index]));
+        dataChunk.min_max.addToMin_values(ByteBuffer.wrap(nodeHolder.getColumnMinData()[index]));
+      } else {
+        dataChunk.setData_page_length(nodeHolder.getDataArray()[index].length);
+        // TODO : Right now the encodings are happening at runtime. change as
+        // per this encoders.
+        dataChunk.setEncoders(encodings);
+
+        dataChunk.setRowMajor(false);
+        // TODO : Right now the encodings are happening at runtime. change as
+        // per this encoders.
+        encodings.add(Encoding.DELTA);
+        dataChunk.setEncoders(encodings);
+        // TODO writing dummy presence meta need to set actual presence
+        // meta
+        PresenceMeta presenceMeta = new PresenceMeta();
+        presenceMeta.setPresent_bit_streamIsSet(true);
+        presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
+            .compressByte(nodeHolder.getMeasureNullValueIndex()[index].toByteArray()));
+        dataChunk.setPresence(presenceMeta);
+        List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
+        encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer(
+            createValueEncoderMeta(nodeHolder.getCompressionModel(), index))));
+        dataChunk.setEncoder_meta(encoderMetaList);
+        dataChunk.min_max
+            .addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[index]));
+        dataChunk.min_max
+            .addToMin_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMinData()[index]));
+      }
+      dataChunk.setEncoders(encodings);
+      colDataChunks.add(dataChunk);
+    }
+    return colDataChunks;
+  }
+
+  public static DataChunk3 getDataChunk3(List<NodeHolder> nodeHolderList,
+      List<ColumnSchema> columnSchenma, SegmentProperties segmentProperties, int index,
+      boolean isDimensionColumn) throws IOException {
+    List<DataChunk2> dataChunksList =
+        getDatachunk2(nodeHolderList, columnSchenma, segmentProperties, index, isDimensionColumn);
+    int offset = 0;
+    DataChunk3 dataChunk = new DataChunk3();
+    List<Integer> pageOffsets = new ArrayList<>();
+    List<Integer> pageLengths = new ArrayList<>();
+    int length = 0;
+    for (int i = 0; i < dataChunksList.size(); i++) {
+      pageOffsets.add(offset);
+      length =
+          dataChunksList.get(i).getData_page_length() + dataChunksList.get(i).getRle_page_length()
+              + dataChunksList.get(i).getRowid_page_length();
+      pageLengths.add(length);
+      offset += length;
+    }
+    dataChunk.setData_chunk_list(dataChunksList);
+    dataChunk.setPage_length(pageLengths);
+    dataChunk.setPage_offset(pageOffsets);
+    return dataChunk;
+  }
+
+  public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
+    ByteBuffer buffer = null;
+    if (valueEncoderMeta.getType() == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+      buffer = ByteBuffer.allocate(
+          (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+              + 3);
+      buffer.putChar(valueEncoderMeta.getType());
+      buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
+      buffer.putDouble((Double) valueEncoderMeta.getMinValue());
+      buffer.putDouble((Double) valueEncoderMeta.getUniqueValue());
+    } else if (valueEncoderMeta.getType() == CarbonCommonConstants.BIG_INT_MEASURE) {
+      buffer = ByteBuffer.allocate(
+          (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+              + 3);
+      buffer.putChar(valueEncoderMeta.getType());
+      buffer.putLong((Long) valueEncoderMeta.getMaxValue());
+      buffer.putLong((Long) valueEncoderMeta.getMinValue());
+      buffer.putLong((Long) valueEncoderMeta.getUniqueValue());
+    } else {
+      buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
+      buffer.putChar(valueEncoderMeta.getType());
+    }
+    buffer.putInt(valueEncoderMeta.getDecimal());
+    buffer.put(valueEncoderMeta.getDataTypeSelected());
+    buffer.flip();
+    return buffer.array();
+  }
+
+  public static byte[] getByteValueForMeasure(Object data, DataType dataType) {
+    ByteBuffer b = null;
+    switch (dataType) {
+      case DOUBLE:
+        b = ByteBuffer.allocate(8);
+        b.putDouble((Double) data);
+        b.flip();
+        return b.array();
+      case LONG:
+      case INT:
+      case SHORT:
+        b = ByteBuffer.allocate(8);
+        b.putLong((Long) data);
+        b.flip();
+        return b.array();
+      case DECIMAL:
+        return DataTypeUtil.bigDecimalToByte((BigDecimal)data);
+      default:
+        throw new IllegalArgumentException("Invalid data type");
+    }
+  }
+
+  public static int compareMeasureData(byte[] first, byte[] second, DataType dataType) {
+    ByteBuffer firstBuffer = null;
+    ByteBuffer secondBuffer = null;
+    switch (dataType) {
+      case DOUBLE:
+        firstBuffer = ByteBuffer.allocate(8);
+        firstBuffer.put(first);
+        secondBuffer = ByteBuffer.allocate(8);
+        secondBuffer.put(first);
+        firstBuffer.flip();
+        secondBuffer.flip();
+        return (int) (firstBuffer.getDouble() - secondBuffer.getDouble());
+      case LONG:
+      case INT:
+      case SHORT:
+        firstBuffer = ByteBuffer.allocate(8);
+        firstBuffer.put(first);
+        secondBuffer = ByteBuffer.allocate(8);
+        secondBuffer.put(first);
+        firstBuffer.flip();
+        secondBuffer.flip();
+        return (int) (firstBuffer.getLong() - secondBuffer.getLong());
+      case DECIMAL:
+        return DataTypeUtil.byteToBigDecimal(first)
+            .compareTo(DataTypeUtil.byteToBigDecimal(second));
+      default:
+        throw new IllegalArgumentException("Invalid data type");
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 962d352..39c36ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -26,6 +26,7 @@ import java.util.Properties;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 
 public final class CarbonProperties {
@@ -85,6 +86,9 @@ public final class CarbonProperties {
     validateCarbonDataFileVersion();
     validateExecutorStartUpTime();
     validatePrefetchBufferSize();
+    validateNumberOfPagesPerBlocklet();
+    validateNumberOfColumnPerIORead();
+    validateNumberOfRowsPerBlockletColumnPage();
   }
 
   private void validatePrefetchBufferSize() {
@@ -107,6 +111,93 @@ public final class CarbonProperties {
     }
   }
 
+  /**
+   * This method validates the number of pages per blocklet column
+   */
+  private void validateNumberOfPagesPerBlocklet() {
+    String numberOfPagePerBlockletColumnString = carbonProperties
+        .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+            CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+    try {
+      short numberOfPagePerBlockletColumn = Short.parseShort(numberOfPagePerBlockletColumnString);
+      if (numberOfPagePerBlockletColumn
+          < CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MIN
+          || numberOfPagePerBlockletColumn
+          > CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_MAX) {
+        LOGGER.info(
+            "The Number Of pages per blocklet column value \"" + numberOfPagePerBlockletColumnString
+                + "\" is invalid. Using the default value \""
+                + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+        carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+            CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info(
+          "The Number Of pages per blocklet column value \"" + numberOfPagePerBlockletColumnString
+              + "\" is invalid. Using the default value \""
+              + CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+      carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN,
+          CarbonV3DataFormatConstants.NUMBER_OF_PAGE_IN_BLOCKLET_COLUMN_DEFAULT_VALUE);
+    }
+  }
+
+  /**
+   * This method validates the number of column read in one IO
+   */
+  private void validateNumberOfColumnPerIORead() {
+    String numberofColumnPerIOString = carbonProperties
+        .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+            CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+    try {
+      short numberofColumnPerIO = Short.parseShort(numberofColumnPerIOString);
+      if (numberofColumnPerIO < CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_MIN
+          || numberofColumnPerIO > CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_MAX) {
+        LOGGER.info("The Number Of pages per blocklet column value \"" + numberofColumnPerIOString
+            + "\" is invalid. Using the default value \""
+            + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+        carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+            CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The Number Of pages per blocklet column value \"" + numberofColumnPerIOString
+          + "\" is invalid. Using the default value \""
+          + CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+      carbonProperties.setProperty(CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO,
+          CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO_DEFAULTVALUE);
+    }
+  }
+
+  /**
+   * This method validates the number of column read in one IO
+   */
+  private void validateNumberOfRowsPerBlockletColumnPage() {
+    String numberOfRowsPerBlockletColumnPageString = carbonProperties
+        .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+            CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+    try {
+      short numberOfRowsPerBlockletColumnPage =
+          Short.parseShort(numberOfRowsPerBlockletColumnPageString);
+      if (numberOfRowsPerBlockletColumnPage
+          < CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MIN
+          || numberOfRowsPerBlockletColumnPage
+          > CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_MAX) {
+        LOGGER.info("The Number Of rows per blocklet column pages value \""
+            + numberOfRowsPerBlockletColumnPageString + "\" is invalid. Using the default value \""
+            + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+        carbonProperties
+            .setProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+                CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.info("The Number Of rows per blocklet column pages value \""
+          + numberOfRowsPerBlockletColumnPageString + "\" is invalid. Using the default value \""
+          + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+      carbonProperties
+          .setProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+              CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT);
+    }
+  }
+
   private void validateBadRecordsLocation() {
     String badRecordsLocation =
         carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
@@ -288,17 +379,16 @@ public final class CarbonProperties {
         carbonProperties.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
     if (carbondataFileVersionString == null) {
       // use default property if user does not specify version property
-      carbonProperties
-          .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
-              CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+      carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+          CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
     } else {
       try {
         ColumnarFormatVersion.valueOf(carbondataFileVersionString);
       } catch (IllegalArgumentException e) {
         // use default property if user specifies an invalid version property
-        LOGGER.warn("Specified file version property is invalid: " +
-            carbondataFileVersionString + ". Using " +
-            CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION + " as default file version");
+        LOGGER.warn("Specified file version property is invalid: " + carbondataFileVersionString
+            + ". Using " + CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION
+            + " as default file version");
         carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
             CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
       }
@@ -569,6 +659,7 @@ public final class CarbonProperties {
 
   /**
    * Returns configured update deleta files value for IUD compaction
+   *
    * @return numberOfDeltaFilesThreshold
    */
   public int getNoUpdateDeltaFilesThresholdForIUDCompaction() {
@@ -588,8 +679,7 @@ public final class CarbonProperties {
       }
     } catch (NumberFormatException e) {
       LOGGER.error("The specified value for property "
-          + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
-          + "is incorrect."
+          + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
           + " Correct value should be in range of 0 -10000. Taking the default value.");
       numberOfDeltaFilesThreshold = Integer
           .parseInt(CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
@@ -599,6 +689,7 @@ public final class CarbonProperties {
 
   /**
    * Returns configured delete deleta files value for IUD compaction
+   *
    * @return numberOfDeltaFilesThreshold
    */
   public int getNoDeleteDeltaFilesThresholdForIUDCompaction() {
@@ -618,8 +709,7 @@ public final class CarbonProperties {
       }
     } catch (NumberFormatException e) {
       LOGGER.error("The specified value for property "
-          + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
-          + "is incorrect."
+          + CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
           + " Correct value should be in range of 0 -10000. Taking the default value.");
       numberOfDeltaFilesThreshold = Integer
           .parseInt(CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index b9a96d2..5a656e0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -73,6 +73,7 @@ import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.DataChunk3;
 
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -471,6 +472,28 @@ public final class CarbonUtil {
             numberCompressor.unCompress(indexMap, 0, indexMap.length));
   }
 
+  public static int[] getUnCompressColumnIndex(int totalLength, ByteBuffer buffer, int offset) {
+    buffer.position(offset);
+    int indexDataLength = buffer.getInt();
+    int indexMapLength = totalLength - indexDataLength - CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    int[] indexData = getIntArray(buffer, buffer.position(), indexDataLength);
+    int[] indexMap = getIntArray(buffer, buffer.position(), indexMapLength);
+    return UnBlockIndexer.uncompressIndex(indexData, indexMap);
+  }
+
+  public static int[] getIntArray(ByteBuffer data, int offset, int length) {
+    if (length == 0) {
+      return new int[0];
+    }
+    data.position(offset);
+    int[] intArray = new int[length / 2];
+    int index = 0;
+    while (index < intArray.length) {
+      intArray[index++] = data.getShort();
+    }
+    return intArray;
+  }
+
   /**
    * Convert int array to Integer list
    *
@@ -1233,6 +1256,18 @@ public final class CarbonUtil {
     }, offset, length);
   }
 
+  public static DataChunk3 readDataChunk3(ByteBuffer dataChunkBuffer, int offset, int length)
+      throws IOException {
+    byte[] data = new byte[length];
+    dataChunkBuffer.position(offset);
+    dataChunkBuffer.get(data);
+    return (DataChunk3) read(data, new ThriftReader.TBaseCreator() {
+      @Override public TBase create() {
+        return new DataChunk3();
+      }
+    }, 0, length);
+  }
+
   public static DataChunk2 readDataChunk(ByteBuffer dataChunkBuffer, int offset, int length)
       throws IOException {
     byte[] data = new byte[length];
@@ -1293,6 +1328,35 @@ public final class CarbonUtil {
     return meta;
   }
 
+  public static ValueEncoderMeta deserializeEncoderMetaNew(byte[] encodeMeta) {
+    ByteBuffer buffer = ByteBuffer.wrap(encodeMeta);
+    char measureType = buffer.getChar();
+    ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta();
+    valueEncoderMeta.setType(measureType);
+    switch (measureType) {
+      case CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE:
+        valueEncoderMeta.setMaxValue(buffer.getDouble());
+        valueEncoderMeta.setMinValue(buffer.getDouble());
+        valueEncoderMeta.setUniqueValue(buffer.getDouble());
+        break;
+      case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+        valueEncoderMeta.setMaxValue(0.0);
+        valueEncoderMeta.setMinValue(0.0);
+        valueEncoderMeta.setUniqueValue(0.0);
+        break;
+      case CarbonCommonConstants.BIG_INT_MEASURE:
+        valueEncoderMeta.setMaxValue(buffer.getLong());
+        valueEncoderMeta.setMinValue(buffer.getLong());
+        valueEncoderMeta.setUniqueValue(buffer.getLong());
+        break;
+      default:
+        throw new IllegalArgumentException("invalid measure type");
+    }
+    valueEncoderMeta.setDecimal(buffer.getInt());
+    valueEncoderMeta.setDataTypeSelected(buffer.get());
+    return valueEncoderMeta;
+  }
+
   /**
    * Below method will be used to convert indexes in range
    * Indexes=[0,1,2,3,4,5,6,7,8,9]
@@ -1454,5 +1518,51 @@ public final class CarbonUtil {
         return null;
     }
   }
+
+  /**
+   * Below method will be used to convert byte data to surrogate key based
+   * column value size
+   *
+   * @param data                data
+   * @param startOffsetOfData   start offset of data
+   * @param eachColumnValueSize size of each column value
+   * @return surrogate key
+   */
+  public static int getSurrogateInternal(byte[] data, int startOffsetOfData,
+      int eachColumnValueSize) {
+    int surrogate = 0;
+    switch (eachColumnValueSize) {
+      case 1:
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData] & 0xFF;
+        return surrogate;
+      case 2:
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+        return surrogate;
+      case 3:
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 2] & 0xFF;
+        return surrogate;
+      case 4:
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 1] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 2] & 0xFF;
+        surrogate <<= 8;
+        surrogate ^= data[startOffsetOfData + 3] & 0xFF;
+        return surrogate;
+      default:
+        throw new IllegalArgumentException("Int cannot me more than 4 bytes");
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
index 08bfd6d..153fcb9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
@@ -56,8 +56,11 @@ public class DataFileFooterConverterFactory {
     switch (version) {
       case V1:
         return new DataFileFooterConverter();
-      default:
+      case V2:
         return new DataFileFooterConverter2();
+      case V3:
+      default:
+        return new DataFileFooterConverterV3();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
new file mode 100644
index 0000000..1ab3133
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterV3.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.reader.CarbonFooterReader;
+import org.apache.carbondata.format.FileFooter;
+
+public class DataFileFooterConverterV3 extends AbstractDataFileFooterConverter {
+
+  /**
+   * Below method will be used to convert thrift file meta to wrapper file meta
+   * This method will read the footer from footer offset present in the data file
+   * 1. It will set the stream offset
+   * 2. It will read the footer data from file
+   * 3. parse the footer to thrift object
+   * 4. convert to wrapper object
+   *
+   * @param tableBlockInfo
+   *        table block info
+   * @return data file footer
+   */
+  @Override public DataFileFooter readDataFileFooter(TableBlockInfo tableBlockInfo)
+      throws IOException {
+    DataFileFooter dataFileFooter = new DataFileFooter();
+    CarbonFooterReader reader =
+        new CarbonFooterReader(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset());
+    FileFooter footer = reader.readFooter();
+    dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) footer.getVersion()));
+    dataFileFooter.setNumberOfRows(footer.getNum_rows());
+    dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
+    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+    List<org.apache.carbondata.format.ColumnSchema> table_columns = footer.getTable_columns();
+    for (int i = 0; i < table_columns.size(); i++) {
+      columnSchemaList.add(thriftColumnSchmeaToWrapperColumnSchema(table_columns.get(i)));
+    }
+    dataFileFooter.setColumnInTable(columnSchemaList);
+
+    List<org.apache.carbondata.format.BlockletIndex> leaf_node_indices_Thrift =
+        footer.getBlocklet_index_list();
+    List<BlockletIndex> blockletIndexList = new ArrayList<BlockletIndex>();
+    for (int i = 0; i < leaf_node_indices_Thrift.size(); i++) {
+      BlockletIndex blockletIndex = getBlockletIndex(leaf_node_indices_Thrift.get(i));
+      blockletIndexList.add(blockletIndex);
+    }
+    List<org.apache.carbondata.format.BlockletInfo3> leaf_node_infos_Thrift =
+        footer.getBlocklet_info_list3();
+    List<BlockletInfo> blockletInfoList = new ArrayList<BlockletInfo>();
+    for (int i = 0; i < leaf_node_infos_Thrift.size(); i++) {
+      BlockletInfo blockletInfo = getBlockletInfo(leaf_node_infos_Thrift.get(i),
+          getNumberOfDimensionColumns(columnSchemaList));
+      blockletInfo.setBlockletIndex(blockletIndexList.get(i));
+      blockletInfoList.add(blockletInfo);
+    }
+    dataFileFooter.setBlockletList(blockletInfoList);
+    dataFileFooter.setBlockletIndex(getBlockletIndexForDataFileFooter(blockletIndexList));
+    return dataFileFooter;
+  }
+
+  /**
+   * Below method is to convert the blocklet info of the thrift to wrapper
+   * blocklet info
+   *
+   * @param blockletInfoThrift blocklet info of the thrift
+   * @return blocklet info wrapper
+   */
+  private BlockletInfo getBlockletInfo(
+      org.apache.carbondata.format.BlockletInfo3 blockletInfoThrift, int numberOfDimensionColumns) {
+    BlockletInfo blockletInfo = new BlockletInfo();
+    List<Long> dimensionColumnChunkOffsets =
+        blockletInfoThrift.getColumn_data_chunks_offsets().subList(0, numberOfDimensionColumns);
+    List<Long> measureColumnChunksOffsets = blockletInfoThrift.getColumn_data_chunks_offsets()
+        .subList(numberOfDimensionColumns,
+            blockletInfoThrift.getColumn_data_chunks_offsets().size());
+    List<Integer> dimensionColumnChunkLength =
+        blockletInfoThrift.getColumn_data_chunks_length().subList(0, numberOfDimensionColumns);
+    List<Integer> measureColumnChunksLength = blockletInfoThrift.getColumn_data_chunks_length()
+        .subList(numberOfDimensionColumns,
+            blockletInfoThrift.getColumn_data_chunks_offsets().size());
+    blockletInfo.setDimensionChunkOffsets(dimensionColumnChunkOffsets);
+    blockletInfo.setMeasureChunkOffsets(measureColumnChunksOffsets);
+    blockletInfo.setDimensionChunksLength(dimensionColumnChunkLength);
+    blockletInfo.setMeasureChunksLength(measureColumnChunksLength);
+    blockletInfo.setNumberOfRows(blockletInfoThrift.getNum_rows());
+    blockletInfo.setDimensionOffset(blockletInfoThrift.getDimension_offsets());
+    blockletInfo.setMeasureOffsets(blockletInfoThrift.getMeasure_offsets());
+    return blockletInfo;
+  }
+
+  /**
+   * Below method will be used to get the number of dimension column
+   * in carbon column schema
+   *
+   * @param columnSchemaList column schema list
+   * @return number of dimension column
+   */
+  private int getNumberOfDimensionColumns(List<ColumnSchema> columnSchemaList) {
+    int numberOfDimensionColumns = 0;
+    int previousColumnGroupId = -1;
+    ColumnSchema columnSchema = null;
+    for (int i = 0; i < columnSchemaList.size(); i++) {
+      columnSchema = columnSchemaList.get(i);
+      if (columnSchema.isDimensionColumn() && columnSchema.isColumnar()) {
+        numberOfDimensionColumns++;
+      } else if (columnSchema.isDimensionColumn()) {
+        if (previousColumnGroupId != columnSchema.getColumnGroupId()) {
+          previousColumnGroupId = columnSchema.getColumnGroupId();
+          numberOfDimensionColumns++;
+        }
+      } else {
+        break;
+      }
+    }
+    return numberOfDimensionColumns;
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
new file mode 100644
index 0000000..d46b806
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+
+public class NodeHolder {
+  /**
+   * keyArray
+   */
+  private byte[][] keyArray;
+
+  /**
+   * dataArray
+   */
+  private byte[][] dataArray;
+
+  /**
+   * measureLenght
+   */
+  private int[] measureLenght;
+
+  /**
+   * startKey
+   */
+  private byte[] startKey;
+
+  /**
+   * endKey
+   */
+  private byte[] endKey;
+
+  /**
+   * entryCount
+   */
+  private int entryCount;
+  /**
+   * keyLenghts
+   */
+  private int[] keyLengths;
+
+  /**
+   * dataAfterCompression
+   */
+  private short[][] dataAfterCompression;
+
+  /**
+   * indexMap
+   */
+  private short[][] indexMap;
+
+  /**
+   * keyIndexBlockLenght
+   */
+  private int[] keyBlockIndexLength;
+
+  /**
+   * isSortedKeyBlock
+   */
+  private boolean[] isSortedKeyBlock;
+
+  private byte[][] compressedIndex;
+
+  private byte[][] compressedIndexMap;
+
+  /**
+   * dataIndexMap
+   */
+  private int[] dataIndexMapLength;
+
+  /**
+   * dataIndexMap
+   */
+  private int[] dataIndexMapOffsets;
+
+  /**
+   * compressedDataIndex
+   */
+  private byte[][] compressedDataIndex;
+
+  /**
+   * column max data
+   */
+  private byte[][] columnMaxData;
+
+  /**
+   * column min data
+   */
+  private byte[][] columnMinData;
+
+  private byte[][] measureColumnMaxData;
+
+  private byte[][] measureColumnMinData;
+
+  /**
+   * compression model for numbers data block.
+   */
+  private WriterCompressModel compressionModel;
+
+  /**
+   * array of aggBlocks flag to identify the aggBlocks
+   */
+  private boolean[] aggBlocks;
+
+  /**
+   * all columns max value
+   */
+  private byte[][] allMaxValue;
+
+  /**
+   * all column max value
+   */
+  private byte[][] allMinValue;
+
+  /**
+   * true if given index is colgroup block
+   */
+  private boolean[] colGrpBlock;
+
+  /**
+   * bit set which will holds the measure
+   * indexes which are null
+   */
+  private BitSet[] measureNullValueIndex;
+
+  /**
+   * total length of dimension values
+   */
+  private int totalDimensionArrayLength;
+
+  /**
+   * total length of all measure values
+   */
+  private int totalMeasureArrayLength;
+
+  /**
+   * @return the keyArray
+   */
+  public byte[][] getKeyArray() {
+    return keyArray;
+  }
+
+  /**
+   * @param keyArray the keyArray to set
+   */
+  public void setKeyArray(byte[][] keyArray) {
+    this.keyArray = keyArray;
+  }
+
+  /**
+   * @return the dataArray
+   */
+  public byte[][] getDataArray() {
+    return dataArray;
+  }
+
+  /**
+   * @param dataArray the dataArray to set
+   */
+  public void setDataArray(byte[][] dataArray) {
+    this.dataArray = dataArray;
+  }
+
+  /**
+   * @return the measureLenght
+   */
+  public int[] getMeasureLenght() {
+    return measureLenght;
+  }
+
+  /**
+   * @param measureLenght the measureLenght to set
+   */
+  public void setMeasureLenght(int[] measureLenght) {
+    this.measureLenght = measureLenght;
+  }
+
+  /**
+   * @return the startKey
+   */
+  public byte[] getStartKey() {
+    return startKey;
+  }
+
+  /**
+   * @param startKey the startKey to set
+   */
+  public void setStartKey(byte[] startKey) {
+    this.startKey = startKey;
+  }
+
+  /**
+   * @return the endKey
+   */
+  public byte[] getEndKey() {
+    return endKey;
+  }
+
+  /**
+   * @param endKey the endKey to set
+   */
+  public void setEndKey(byte[] endKey) {
+    this.endKey = endKey;
+  }
+
+  /**
+   * @return the entryCount
+   */
+  public int getEntryCount() {
+    return entryCount;
+  }
+
+  /**
+   * @param entryCount the entryCount to set
+   */
+  public void setEntryCount(int entryCount) {
+    this.entryCount = entryCount;
+  }
+
+  /**
+   * @return the keyLenghts
+   */
+  public int[] getKeyLengths() {
+    return keyLengths;
+  }
+
+  public void setKeyLengths(int[] keyLengths) {
+    this.keyLengths = keyLengths;
+  }
+
+  /**
+   * @return the keyBlockIndexLength
+   */
+  public int[] getKeyBlockIndexLength() {
+    return keyBlockIndexLength;
+  }
+
+  /**
+   * @param keyBlockIndexLength the keyBlockIndexLength to set
+   */
+  public void setKeyBlockIndexLength(int[] keyBlockIndexLength) {
+    this.keyBlockIndexLength = keyBlockIndexLength;
+  }
+
+  /**
+   * @return the isSortedKeyBlock
+   */
+  public boolean[] getIsSortedKeyBlock() {
+    return isSortedKeyBlock;
+  }
+
+  /**
+   * @param isSortedKeyBlock the isSortedKeyBlock to set
+   */
+  public void setIsSortedKeyBlock(boolean[] isSortedKeyBlock) {
+    this.isSortedKeyBlock = isSortedKeyBlock;
+  }
+
+  /**
+   * @return the compressedIndexex
+   */
+  public byte[][] getCompressedIndex() {
+    return compressedIndex;
+  }
+
+  public void setCompressedIndex(byte[][] compressedIndex) {
+    this.compressedIndex = compressedIndex;
+  }
+
+  /**
+   * @return the compressedIndexMap
+   */
+  public byte[][] getCompressedIndexMap() {
+    return compressedIndexMap;
+  }
+
+  /**
+   * @param compressedIndexMap the compressedIndexMap to set
+   */
+  public void setCompressedIndexMap(byte[][] compressedIndexMap) {
+    this.compressedIndexMap = compressedIndexMap;
+  }
+
+  /**
+   * @return the compressedDataIndex
+   */
+  public byte[][] getCompressedDataIndex() {
+    return compressedDataIndex;
+  }
+
+  /**
+   * @param compressedDataIndex the compressedDataIndex to set
+   */
+  public void setCompressedDataIndex(byte[][] compressedDataIndex) {
+    this.compressedDataIndex = compressedDataIndex;
+  }
+
+  /**
+   * @return the dataIndexMapLength
+   */
+  public int[] getDataIndexMapLength() {
+    return dataIndexMapLength;
+  }
+
+  /**
+   * @param dataIndexMapLength the dataIndexMapLength to set
+   */
+  public void setDataIndexMapLength(int[] dataIndexMapLength) {
+    this.dataIndexMapLength = dataIndexMapLength;
+  }
+
+  public byte[][] getColumnMaxData() {
+    return this.columnMaxData;
+  }
+
+  public void setColumnMaxData(byte[][] columnMaxData) {
+    this.columnMaxData = columnMaxData;
+  }
+
+  public byte[][] getColumnMinData() {
+    return this.columnMinData;
+  }
+
+  public void setColumnMinData(byte[][] columnMinData) {
+    this.columnMinData = columnMinData;
+  }
+
+  public WriterCompressModel getCompressionModel() {
+    return compressionModel;
+  }
+
+  public void setCompressionModel(WriterCompressModel compressionModel) {
+    this.compressionModel = compressionModel;
+  }
+
+  /**
+   * returns array of aggBlocks flag to identify the aag blocks
+   *
+   * @return
+   */
+  public boolean[] getAggBlocks() {
+    return aggBlocks;
+  }
+
+  /**
+   * set array of aggBlocks flag to identify the aggBlocks
+   *
+   * @param aggBlocks
+   */
+  public void setAggBlocks(boolean[] aggBlocks) {
+    this.aggBlocks = aggBlocks;
+  }
+
+  /**
+   * @return
+   */
+  public boolean[] getColGrpBlocks() {
+    return this.colGrpBlock;
+  }
+
+  /**
+   * @param colGrpBlock true if block is column group
+   */
+  public void setColGrpBlocks(boolean[] colGrpBlock) {
+    this.colGrpBlock = colGrpBlock;
+  }
+
+  /**
+   * @return the measureNullValueIndex
+   */
+  public BitSet[] getMeasureNullValueIndex() {
+    return measureNullValueIndex;
+  }
+
+  /**
+   * @param measureNullValueIndex the measureNullValueIndex to set
+   */
+  public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
+    this.measureNullValueIndex = measureNullValueIndex;
+  }
+
+  public int getTotalDimensionArrayLength() {
+    return totalDimensionArrayLength;
+  }
+
+  public void setTotalDimensionArrayLength(int totalDimensionArrayLength) {
+    this.totalDimensionArrayLength = totalDimensionArrayLength;
+  }
+
+  public int getTotalMeasureArrayLength() {
+    return totalMeasureArrayLength;
+  }
+
+  public void setTotalMeasureArrayLength(int totalMeasureArrayLength) {
+    this.totalMeasureArrayLength = totalMeasureArrayLength;
+  }
+
+  public byte[][] getMeasureColumnMaxData() {
+    return measureColumnMaxData;
+  }
+
+  public void setMeasureColumnMaxData(byte[][] measureColumnMaxData) {
+    this.measureColumnMaxData = measureColumnMaxData;
+  }
+
+  public byte[][] getMeasureColumnMinData() {
+    return measureColumnMinData;
+  }
+
+  public void setMeasureColumnMinData(byte[][] measureColumnMinData) {
+    this.measureColumnMinData = measureColumnMinData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 3935bdc..2c6c890 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -155,7 +155,7 @@ public class CarbonMetadataUtilTest {
     segmentInfo.setNum_cols(0);
     segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
     IndexHeader indexHeader = new IndexHeader();
-    indexHeader.setVersion(2);
+    indexHeader.setVersion(3);
     indexHeader.setSegment_info(segmentInfo);
     indexHeader.setTable_columns(columnSchemaList);
     indexHeader.setBucket_id(0);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index 759fbf7..3114ee1 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -127,10 +127,21 @@ struct DataChunk2{
     7: optional SortState sort_state;
     8: optional list<schema.Encoding> encoders; // The List of encoders overriden at node level
     9: optional list<binary> encoder_meta; // extra information required by encoders
-}
+    10: optional BlockletMinMaxIndex min_max; 
+    11: optional i32 numberOfRowsInpage;
+ }
 
 
 /**
+* Represents a chunk of data. The chunk can be a single column stored in Column Major format or a group of columns stored in Row Major Format.
+**/
+struct DataChunk3{
+    1: required list<DataChunk2> data_chunk_list; // list of data chunk
+    2: optional list<i32> page_offset; // offset of each chunk
+    3: optional list<i32> page_length; // length of each chunk
+   
+ }
+/**
 *	Information about a blocklet
 */
 struct BlockletInfo{
@@ -146,7 +157,16 @@ struct BlockletInfo2{
     2: required list<i64> column_data_chunks_offsets;	// Information about offsets all column chunks in this blocklet
     3: required list<i16> column_data_chunks_length;	// Information about length all column chunks in this blocklet
 }
-
+/**
+*	Information about a blocklet
+*/
+struct BlockletInfo3{
+    1: required i32 num_rows;	// Number of rows in this blocklet
+    2: required list<i64> column_data_chunks_offsets;	// Information about offsets all column chunks in this blocklet
+    3: required list<i32> column_data_chunks_length;	// Information about length all column chunks in this blocklet
+    4: required i64 dimension_offsets;
+    5: required i64 measure_offsets;
+  }
 /**
 * Footer for indexed carbon file
 */
@@ -158,7 +178,8 @@ struct FileFooter{
     5: required list<BlockletIndex> blocklet_index_list;	// blocklet index of all blocklets in this file
     6: optional list<BlockletInfo> blocklet_info_list;	// Information about blocklets of all columns in this file
     7: optional list<BlockletInfo2> blocklet_info_list2;	// Information about blocklets of all columns in this file
-    8: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary
+    8: optional list<BlockletInfo3> blocklet_info_list3;	// Information about blocklets of all columns in this file
+    9: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
index 828ece8..3f75cd1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
 import org.apache.carbondata.processing.store.writer.v2.CarbonFactDataWriterImplV2;
+import org.apache.carbondata.processing.store.writer.v3.CarbonFactDataWriterImplV3;
 
 /**
  * Factory class to get the writer instance
@@ -62,8 +63,12 @@ public class CarbonDataWriterFactory {
     switch (version) {
       case V1:
         return new CarbonFactDataWriterImplV1(carbonDataWriterVo);
-      default:
+      case V2:
         return new CarbonFactDataWriterImplV2(carbonDataWriterVo);
+      case V3:
+        return new CarbonFactDataWriterImplV3(carbonDataWriterVo);
+      default:
+        return new CarbonFactDataWriterImplV3(carbonDataWriterVo);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2cf1104d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index bf66700..0699167 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -40,9 +40,11 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
@@ -59,6 +61,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.mdkeygen.file.FileManager;
@@ -70,7 +73,6 @@ import org.apache.carbondata.processing.store.colgroup.ColumnDataHolder;
 import org.apache.carbondata.processing.store.colgroup.DataHolder;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.NodeHolder;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
 
@@ -257,6 +259,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private int bucketNumber;
 
   /**
+   * current data format version
+   */
+  private ColumnarFormatVersion version;
+
+  /**
    * CarbonFactDataHandler constructor
    */
   public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
@@ -326,6 +333,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       }
       aggKeyBlock = arrangeUniqueBlockType(aggKeyBlock);
     }
+    version = CarbonProperties.getInstance().getFormatVersion();
   }
 
   private void initParameters(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
@@ -476,7 +484,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       } else if (type[i] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
         max[i] = -Double.MAX_VALUE;
       } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        max[i] = new BigDecimal(0.0);
+        max[i] = new BigDecimal(-Double.MAX_VALUE);
       } else {
         max[i] = 0.0;
       }
@@ -748,9 +756,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   // TODO remove after kettle flow is removed
   private NodeHolder getNodeHolderObject(byte[][] dataHolderLocal, byte[][] byteArrayValues,
       int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
-      WriterCompressModel compressionModel, byte[][] noDictionaryData,
-      byte[] noDictionaryStartKey, byte[] noDictionaryEndKey)
-      throws CarbonDataWriterException {
+      WriterCompressModel compressionModel, byte[][] noDictionaryData, byte[] noDictionaryStartKey,
+      byte[] noDictionaryEndKey) throws CarbonDataWriterException {
     byte[][][] noDictionaryColumnsData = null;
     List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
     int complexColCount = getComplexColsCount();
@@ -836,9 +843,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       if (dimensionType[i]) {
         dictionaryColumnCount++;
         if (colGrpModel.isColumnar(dictionaryColumnCount)) {
-          submit.add(executorService
-              .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(),
-                  true, isUseInvertedIndex[i])));
+          submit.add(executorService.submit(
+              new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
+                  isUseInvertedIndex[i])));
         } else {
           submit.add(
               executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
@@ -876,8 +883,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private NodeHolder getNodeHolderObjectWithOutKettle(byte[][] dataHolderLocal,
       byte[][] byteArrayValues, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
       WriterCompressModel compressionModel, byte[][][] noDictionaryData,
-      byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey)
-      throws CarbonDataWriterException {
+      byte[][] noDictionaryStartKey, byte[][] noDictionaryEndKey) throws CarbonDataWriterException {
     byte[][][] noDictionaryColumnsData = null;
     List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
     int complexColCount = getComplexColsCount();
@@ -907,7 +913,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
             int keyLength = splitKey[j].length;
             byte[] newKey = new byte[keyLength + 2];
             ByteBuffer buffer = ByteBuffer.wrap(newKey);
-            buffer.putShort((short)keyLength);
+            buffer.putShort((short) keyLength);
             System.arraycopy(splitKey[j], 0, newKey, 2, keyLength);
             noDictionaryColumnsData[j][i] = newKey;
           }
@@ -963,9 +969,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       if (dimensionType[i]) {
         dictionaryColumnCount++;
         if (colGrpModel.isColumnar(dictionaryColumnCount)) {
-          submit.add(executorService
-              .submit(new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(),
-                  true, isUseInvertedIndex[i])));
+          submit.add(executorService.submit(
+              new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), true,
+                  isUseInvertedIndex[i])));
         } else {
           submit.add(
               executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
@@ -1008,7 +1014,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
             endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey);
   }
 
-
   /**
    * DataHolder will have all row mdkey data
    *
@@ -1150,6 +1155,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
     return decimalPlaces;
   }
+
   /**
    * This method will be used to update the max value for each measure
    */
@@ -1220,7 +1226,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.BLOCKLET_SIZE,
             CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL));
-    LOGGER.info("Blocklet Size: " + blockletSize);
+    if (version == ColumnarFormatVersion.V3) {
+      this.blockletSize = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE,
+              CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT));
+    }
+    LOGGER.info("Number of rows per column blocklet " + blockletSize);
     dataRows = new ArrayList<>(this.blockletSize);
     int dimSet =
         Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
@@ -1280,8 +1291,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         .getBlockKeySize());
     System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
         blockKeySize.length - noOfColStore);
-    this.dataWriter =
-        getFactDataWriter(keyBlockSize);
+    this.dataWriter = getFactDataWriter(keyBlockSize);
     this.dataWriter.setIsNoDictionary(isNoDictionary);
     // initialize the channel;
     this.dataWriter.initializeWriter();
@@ -1377,7 +1387,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * @return data writer instance
    */
   private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
-    ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
     return CarbonDataWriterFactory.getInstance()
         .getFactDataWriter(version, getDataWriterVo(keyBlockSize));
   }
@@ -1620,8 +1629,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
     @Override public IndexStorage call() throws Exception {
       if (isUseInvertedIndex) {
-        return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary,
-            isSortRequired);
+        if (version == ColumnarFormatVersion.V3) {
+          return new BlockIndexerStorageForShort(this.data, isCompressionReq, isNoDictionary,
+              isSortRequired);
+        } else {
+          return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary,
+              isSortRequired);
+        }
       } else {
         return new BlockIndexerStorageForNoInvertedIndex(this.data, isCompressionReq,
             isNoDictionary);