You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/12/01 09:53:55 UTC

[2/5] incubator-carbondata git commit: Improve first time query performance

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java b/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
index 7663738..bf61be2 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/impl/QueryExecutorProperties.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder;
 import org.apache.carbondata.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.scan.filter.GenericQueryType;
@@ -79,6 +80,8 @@ public class QueryExecutorProperties {
    * all the complex dimension which is on filter
    */
   public Set<CarbonDimension> complexFilterDimension;
+
+  public Set<CarbonMeasure> filterMeasures;
   /**
    * to record the query execution details phase wise
    */
@@ -91,5 +94,4 @@ public class QueryExecutorProperties {
    * list of blocks in which query will be executed
    */
   protected List<AbstractIndex> dataBlocks;
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java b/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
index d84a183..2e80984 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/infos/BlockExecutionInfo.java
@@ -116,12 +116,12 @@ public class BlockExecutionInfo {
   /**
    * will be used to read the dimension block from file
    */
-  private int[] allSelectedDimensionBlocksIndexes;
+  private int[][] allSelectedDimensionBlocksIndexes;
 
   /**
    * will be used to read the measure block from file
    */
-  private int[] allSelectedMeasureBlocksIndexes;
+  private int[][] allSelectedMeasureBlocksIndexes;
 
   /**
    * this will be used to update the older block fixed length keys with the
@@ -407,28 +407,28 @@ public class BlockExecutionInfo {
   /**
    * @return the allSelectedDimensionBlocksIndexes
    */
-  public int[] getAllSelectedDimensionBlocksIndexes() {
+  public int[][] getAllSelectedDimensionBlocksIndexes() {
     return allSelectedDimensionBlocksIndexes;
   }
 
   /**
    * @param allSelectedDimensionBlocksIndexes the allSelectedDimensionBlocksIndexes to set
    */
-  public void setAllSelectedDimensionBlocksIndexes(int[] allSelectedDimensionBlocksIndexes) {
+  public void setAllSelectedDimensionBlocksIndexes(int[][] allSelectedDimensionBlocksIndexes) {
     this.allSelectedDimensionBlocksIndexes = allSelectedDimensionBlocksIndexes;
   }
 
   /**
    * @return the allSelectedMeasureBlocksIndexes
    */
-  public int[] getAllSelectedMeasureBlocksIndexes() {
+  public int[][] getAllSelectedMeasureBlocksIndexes() {
     return allSelectedMeasureBlocksIndexes;
   }
 
   /**
    * @param allSelectedMeasureBlocksIndexes the allSelectedMeasureBlocksIndexes to set
    */
-  public void setAllSelectedMeasureBlocksIndexes(int[] allSelectedMeasureBlocksIndexes) {
+  public void setAllSelectedMeasureBlocksIndexes(int[][] allSelectedMeasureBlocksIndexes) {
     this.allSelectedMeasureBlocksIndexes = allSelectedMeasureBlocksIndexes;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java
index 8837012..680876c 100644
--- a/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/scan/executor/util/QueryUtil.java
@@ -211,16 +211,19 @@ public class QueryUtil {
    */
   public static int[] getDimensionsBlockIndexes(List<QueryDimension> queryDimensions,
       Map<Integer, Integer> dimensionOrdinalToBlockMapping,
-      List<CarbonDimension> customAggregationDimension) {
+      List<CarbonDimension> customAggregationDimension, Set<CarbonDimension> filterDimensions) {
     // using set as in row group columns will point to same block
     Set<Integer> dimensionBlockIndex = new HashSet<Integer>();
+    Set<Integer> filterDimensionOrdinal = getFilterDimensionOrdinal(filterDimensions);
     int blockIndex = 0;
     for (int i = 0; i < queryDimensions.size(); i++) {
-      blockIndex =
-          dimensionOrdinalToBlockMapping.get(queryDimensions.get(i).getDimension().getOrdinal());
-      dimensionBlockIndex.add(blockIndex);
-      if (queryDimensions.get(i).getDimension().numberOfChild() > 0) {
-        addChildrenBlockIndex(dimensionBlockIndex, queryDimensions.get(i).getDimension());
+      if (!filterDimensionOrdinal.contains(queryDimensions.get(i).getDimension().getOrdinal())) {
+        blockIndex =
+            dimensionOrdinalToBlockMapping.get(queryDimensions.get(i).getDimension().getOrdinal());
+        dimensionBlockIndex.add(blockIndex);
+        if (queryDimensions.get(i).getDimension().numberOfChild() > 0) {
+          addChildrenBlockIndex(dimensionBlockIndex, queryDimensions.get(i).getDimension());
+        }
       }
     }
     for (int i = 0; i < customAggregationDimension.size(); i++) {
@@ -230,8 +233,10 @@ public class QueryUtil {
       // is not push down in case of complex dimension
       dimensionBlockIndex.add(blockIndex);
     }
-    return ArrayUtils
+    int[] dimensionIndex = ArrayUtils
         .toPrimitive(dimensionBlockIndex.toArray(new Integer[dimensionBlockIndex.size()]));
+    Arrays.sort(dimensionIndex);
+    return dimensionIndex;
   }
 
   /**
@@ -252,15 +257,14 @@ public class QueryUtil {
    * Below method will be used to get the dictionary mapping for all the
    * dictionary encoded dimension present in the query
    *
-   * @param queryDimensions            query dimension present in the query this will be used to
-   *                                   convert the result from surrogate key to actual data
-   * @param absoluteTableIdentifier    absolute table identifier
+   * @param queryDimensions         query dimension present in the query this will be used to
+   *                                convert the result from surrogate key to actual data
+   * @param absoluteTableIdentifier absolute table identifier
    * @return dimension unique id to its dictionary map
    * @throws QueryExecutionException
    */
   public static Map<String, Dictionary> getDimensionDictionaryDetail(
-      List<QueryDimension> queryDimensions,
-      Set<CarbonDimension> filterComplexDimensions,
+      List<QueryDimension> queryDimensions, Set<CarbonDimension> filterComplexDimensions,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws QueryExecutionException {
     // to store dimension unique column id list, this is required as
     // dimension can be present in
@@ -387,16 +391,23 @@ public class QueryUtil {
    * @return block indexes
    */
   public static int[] getMeasureBlockIndexes(List<QueryMeasure> queryMeasures,
-      List<CarbonMeasure> expressionMeasure, Map<Integer, Integer> ordinalToBlockIndexMapping) {
+      List<CarbonMeasure> expressionMeasure, Map<Integer, Integer> ordinalToBlockIndexMapping,
+      Set<CarbonMeasure> filterMeasures) {
     Set<Integer> measureBlockIndex = new HashSet<Integer>();
+    Set<Integer> filterMeasureOrdinal = getFilterMeasureOrdinal(filterMeasures);
     for (int i = 0; i < queryMeasures.size(); i++) {
-      measureBlockIndex
-          .add(ordinalToBlockIndexMapping.get(queryMeasures.get(i).getMeasure().getOrdinal()));
+      if (!filterMeasureOrdinal.contains(queryMeasures.get(i).getMeasure().getOrdinal())) {
+        measureBlockIndex
+            .add(ordinalToBlockIndexMapping.get(queryMeasures.get(i).getMeasure().getOrdinal()));
+      }
     }
     for (int i = 0; i < expressionMeasure.size(); i++) {
       measureBlockIndex.add(ordinalToBlockIndexMapping.get(expressionMeasure.get(i).getOrdinal()));
     }
-    return ArrayUtils.toPrimitive(measureBlockIndex.toArray(new Integer[measureBlockIndex.size()]));
+    int[] measureIndexes =
+        ArrayUtils.toPrimitive(measureBlockIndex.toArray(new Integer[measureBlockIndex.size()]));
+    Arrays.sort(measureIndexes);
+    return measureIndexes;
   }
 
   /**
@@ -912,21 +923,19 @@ public class QueryUtil {
     return parentBlockIndex;
   }
 
-  public static Set<CarbonDimension> getAllFilterDimensions(FilterResolverIntf filterResolverTree) {
-    Set<CarbonDimension> filterDimensions = new HashSet<CarbonDimension>();
+  public static void getAllFilterDimensions(FilterResolverIntf filterResolverTree,
+      Set<CarbonDimension> filterDimensions, Set<CarbonMeasure> filterMeasure) {
     if (null == filterResolverTree) {
-      return filterDimensions;
+      return;
     }
     List<ColumnExpression> dimensionResolvedInfos = new ArrayList<ColumnExpression>();
     Expression filterExpression = filterResolverTree.getFilterExpression();
-    addColumnDimensions(filterExpression, filterDimensions);
+    addColumnDimensions(filterExpression, filterDimensions, filterMeasure);
     for (ColumnExpression info : dimensionResolvedInfos) {
       if (info.isDimension() && info.getDimension().getNumberOfChild() > 0) {
         filterDimensions.add(info.getDimension());
       }
     }
-    return filterDimensions;
-
   }
 
   /**
@@ -938,14 +947,53 @@ public class QueryUtil {
    * @return
    */
   private static void addColumnDimensions(Expression expression,
-      Set<CarbonDimension> filterDimensions) {
-    if (null != expression && expression instanceof ColumnExpression
-        && ((ColumnExpression) expression).isDimension()) {
-      filterDimensions.add(((ColumnExpression) expression).getDimension());
+      Set<CarbonDimension> filterDimensions, Set<CarbonMeasure> filterMeasure) {
+    if (null != expression && expression instanceof ColumnExpression) {
+      if (((ColumnExpression) expression).isDimension()) {
+        filterDimensions.add(((ColumnExpression) expression).getDimension());
+      } else {
+        filterMeasure.add((CarbonMeasure) ((ColumnExpression) expression).getCarbonColumn());
+      }
       return;
     }
     for (Expression child : expression.getChildren()) {
-      addColumnDimensions(child, filterDimensions);
+      addColumnDimensions(child, filterDimensions, filterMeasure);
+    }
+  }
+
+  private static Set<Integer> getFilterMeasureOrdinal(Set<CarbonMeasure> filterMeasures) {
+    Set<Integer> filterMeasuresOrdinal = new HashSet<>();
+    for (CarbonMeasure filterMeasure : filterMeasures) {
+      filterMeasuresOrdinal.add(filterMeasure.getOrdinal());
+    }
+    return filterMeasuresOrdinal;
+  }
+
+  private static Set<Integer> getFilterDimensionOrdinal(Set<CarbonDimension> filterDimensions) {
+    Set<Integer> filterDimensionsOrdinal = new HashSet<>();
+    for (CarbonDimension filterDimension : filterDimensions) {
+      filterDimensionsOrdinal.add(filterDimension.getOrdinal());
+      getChildDimensionOrdinal(filterDimension, filterDimensionsOrdinal);
+    }
+    return filterDimensionsOrdinal;
+  }
+
+  /**
+   * Below method will be used to fill the children dimension column id
+   *
+   * @param queryDimensions              query dimension
+   * @param dictionaryDimensionFromQuery dictionary dimension for query
+   */
+  private static void getChildDimensionOrdinal(CarbonDimension queryDimensions,
+      Set<Integer> filterDimensionsOrdinal) {
+    for (int j = 0; j < queryDimensions.numberOfChild(); j++) {
+      List<Encoding> encodingList = queryDimensions.getListOfChildDimensions().get(j).getEncoder();
+      if (queryDimensions.getListOfChildDimensions().get(j).numberOfChild() > 0) {
+        getChildDimensionOrdinal(queryDimensions.getListOfChildDimensions().get(j),
+            filterDimensionsOrdinal);
+      } else if (!CarbonUtil.hasEncoding(encodingList, Encoding.DIRECT_DICTIONARY)) {
+        filterDimensionsOrdinal.add(queryDimensions.getListOfChildDimensions().get(j).getOrdinal());
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/scan/scanner/AbstractBlockletScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/scan/scanner/AbstractBlockletScanner.java
index caee061..80e4837 100644
--- a/core/src/main/java/org/apache/carbondata/scan/scanner/AbstractBlockletScanner.java
+++ b/core/src/main/java/org/apache/carbondata/scan/scanner/AbstractBlockletScanner.java
@@ -50,13 +50,12 @@ public abstract class AbstractBlockletScanner implements BlockletScanner {
 
   protected void fillKeyValue(BlocksChunkHolder blocksChunkHolder) {
     scannedResult.reset();
-    scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock()
-        .getMeasureChunks(blocksChunkHolder.getFileReader(),
-            blockExecutionInfo.getAllSelectedMeasureBlocksIndexes()));
     scannedResult.setNumberOfRows(blocksChunkHolder.getDataBlock().nodeSize());
-
     scannedResult.setDimensionChunks(blocksChunkHolder.getDataBlock()
         .getDimensionChunks(blocksChunkHolder.getFileReader(),
             blockExecutionInfo.getAllSelectedDimensionBlocksIndexes()));
+    scannedResult.setMeasureChunks(blocksChunkHolder.getDataBlock()
+            .getMeasureChunks(blocksChunkHolder.getFileReader(),
+                blockExecutionInfo.getAllSelectedMeasureBlocksIndexes()));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/main/java/org/apache/carbondata/scan/scanner/impl/FilterScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/scan/scanner/impl/FilterScanner.java b/core/src/main/java/org/apache/carbondata/scan/scanner/impl/FilterScanner.java
index 3aafd42..ac71100 100644
--- a/core/src/main/java/org/apache/carbondata/scan/scanner/impl/FilterScanner.java
+++ b/core/src/main/java/org/apache/carbondata/scan/scanner/impl/FilterScanner.java
@@ -62,7 +62,7 @@ public class FilterScanner extends AbstractBlockletScanner {
   private QueryStatisticsModel queryStatisticsModel;
 
   public FilterScanner(BlockExecutionInfo blockExecutionInfo,
-                       QueryStatisticsModel queryStatisticsModel) {
+      QueryStatisticsModel queryStatisticsModel) {
     super(blockExecutionInfo);
     scannedResult = new FilterQueryScannedResult(blockExecutionInfo);
     // to check whether min max is enabled or not
@@ -116,8 +116,8 @@ public class FilterScanner extends AbstractBlockletScanner {
     scannedResult.reset();
     QueryStatistic totalBlockletStatistic = queryStatisticsModel.getStatisticsTypeAndObjMap()
         .get(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM);
-    totalBlockletStatistic.addCountStatistic(
-        QueryStatisticsConstants.TOTAL_BLOCKLET_NUM, totalBlockletStatistic.getCount() + 1);
+    totalBlockletStatistic.addCountStatistic(QueryStatisticsConstants.TOTAL_BLOCKLET_NUM,
+        totalBlockletStatistic.getCount() + 1);
     queryStatisticsModel.getRecorder().recordStatistics(totalBlockletStatistic);
     // apply min max
     if (isMinMaxEnabled) {
@@ -153,35 +153,41 @@ public class FilterScanner extends AbstractBlockletScanner {
     }
 
     FileHolder fileReader = blocksChunkHolder.getFileReader();
-    int[] allSelectedDimensionBlocksIndexes =
+    int[][] allSelectedDimensionBlocksIndexes =
         blockExecutionInfo.getAllSelectedDimensionBlocksIndexes();
+    DimensionColumnDataChunk[] projectionListDimensionChunk = blocksChunkHolder.getDataBlock()
+        .getDimensionChunks(fileReader, allSelectedDimensionBlocksIndexes);
+
     DimensionColumnDataChunk[] dimensionColumnDataChunk =
         new DimensionColumnDataChunk[blockExecutionInfo.getTotalNumberDimensionBlock()];
     // read dimension chunk blocks from file which is not present
+    for (int i = 0; i < dimensionColumnDataChunk.length; i++) {
+      if (null != blocksChunkHolder.getDimensionDataChunk()[i]) {
+        dimensionColumnDataChunk[i] = blocksChunkHolder.getDimensionDataChunk()[i];
+      }
+    }
     for (int i = 0; i < allSelectedDimensionBlocksIndexes.length; i++) {
-      if (null == blocksChunkHolder.getDimensionDataChunk()[allSelectedDimensionBlocksIndexes[i]]) {
-        dimensionColumnDataChunk[allSelectedDimensionBlocksIndexes[i]] =
-            blocksChunkHolder.getDataBlock()
-                .getDimensionChunk(fileReader, allSelectedDimensionBlocksIndexes[i]);
-      } else {
-        dimensionColumnDataChunk[allSelectedDimensionBlocksIndexes[i]] =
-            blocksChunkHolder.getDimensionDataChunk()[allSelectedDimensionBlocksIndexes[i]];
+      for (int j = allSelectedDimensionBlocksIndexes[i][0];
+           j <= allSelectedDimensionBlocksIndexes[i][1]; j++) {
+        dimensionColumnDataChunk[j] = projectionListDimensionChunk[j];
       }
     }
     MeasureColumnDataChunk[] measureColumnDataChunk =
         new MeasureColumnDataChunk[blockExecutionInfo.getTotalNumberOfMeasureBlock()];
-    int[] allSelectedMeasureBlocksIndexes = blockExecutionInfo.getAllSelectedMeasureBlocksIndexes();
-
+    int[][] allSelectedMeasureBlocksIndexes =
+        blockExecutionInfo.getAllSelectedMeasureBlocksIndexes();
+    MeasureColumnDataChunk[] projectionListMeasureChunk = blocksChunkHolder.getDataBlock()
+        .getMeasureChunks(fileReader, allSelectedMeasureBlocksIndexes);
     // read the measure chunk blocks which is not present
+    for (int i = 0; i < measureColumnDataChunk.length; i++) {
+      if (null != blocksChunkHolder.getMeasureDataChunk()[i]) {
+        measureColumnDataChunk[i] = blocksChunkHolder.getMeasureDataChunk()[i];
+      }
+    }
     for (int i = 0; i < allSelectedMeasureBlocksIndexes.length; i++) {
-
-      if (null == blocksChunkHolder.getMeasureDataChunk()[allSelectedMeasureBlocksIndexes[i]]) {
-        measureColumnDataChunk[allSelectedMeasureBlocksIndexes[i]] =
-            blocksChunkHolder.getDataBlock()
-                .getMeasureChunk(fileReader, allSelectedMeasureBlocksIndexes[i]);
-      } else {
-        measureColumnDataChunk[allSelectedMeasureBlocksIndexes[i]] =
-            blocksChunkHolder.getMeasureDataChunk()[allSelectedMeasureBlocksIndexes[i]];
+      for (int j = allSelectedMeasureBlocksIndexes[i][0];
+           j <= allSelectedMeasureBlocksIndexes[i][1]; j++) {
+        measureColumnDataChunk[j] = projectionListMeasureChunk[j];
       }
     }
     scannedResult.setDimensionChunks(dimensionColumnDataChunk);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
index 7aa50ad..eabf688 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
@@ -28,7 +28,7 @@ public class BlockInfoTest {
   static BlockInfo blockInfo;
 
   @BeforeClass public static void setup() {
-    blockInfo = new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6));
+    blockInfo = new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6,(short)1));
   }
 
   @Test public void hashCodeTest() {
@@ -44,7 +44,7 @@ public class BlockInfoTest {
 
   @Test public void equalsTestWithSimilarObject() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6));
+        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, (short)1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (res);
   }
@@ -61,28 +61,28 @@ public class BlockInfoTest {
 
   @Test public void equalsTestWithDifferentSegmentId() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 6, "diffSegmentId", null, 6));
+        new BlockInfo(new TableBlockInfo("filePath", 6, "diffSegmentId", null, 6,(short)1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDifferentOffset() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 62, "segmentId", null, 6));
+        new BlockInfo(new TableBlockInfo("filePath", 62, "segmentId", null, 6, (short)1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDifferentBlockLength() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 62));
+        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 62, (short)1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffFilePath() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("diffFilePath", 6, "segmentId", null, 62));
+        new BlockInfo(new TableBlockInfo("diffFilePath", 6, "segmentId", null, 62, (short)1));
     Boolean res = blockInfoTest.equals(blockInfo);
     assert (!res);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
index 7c1bbed..1b49f83 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
@@ -34,8 +34,8 @@ public class TableBlockInfoTest {
   static TableBlockInfo tableBlockInfos;
 
   @BeforeClass public static void setup() {
-    tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6);
-    tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2));
+    tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, (short) 1);
+    tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), (short) 1);
   }
 
   @Test public void equalTestWithSameObject() {
@@ -44,7 +44,7 @@ public class TableBlockInfoTest {
   }
 
   @Test public void equalTestWithSimilarObject() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, (short) 1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (res);
   }
@@ -60,52 +60,52 @@ public class TableBlockInfoTest {
   }
 
   @Test public void equlsTestWithDiffSegmentId() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, (short) 1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equlsTestWithDiffBlockOffset() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, (short) 1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffBlockLength() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, (short) 1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffBlockletNumber() {
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2));
+        new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
     Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffFilePath() {
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2));
+        new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
     Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void compareToTestForSegmentId() {
     TableBlockInfo tableBlockInfo =
-        new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2));
+        new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedResult = 2;
     assertEquals(res, expectedResult);
 
     TableBlockInfo tableBlockInfo1 =
-        new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2));
+        new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedResult1 = -1;
     assertEquals(res1, expectedResult1);
 
     TableBlockInfo tableBlockInfo2 =
-        new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2));
+        new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
     int res2 = tableBlockInfos.compareTo(tableBlockInfo2);
     int expectedresult2 = 1;
     assertEquals(res2, expectedresult2);
@@ -130,18 +130,18 @@ public class TableBlockInfoTest {
 
     };
 
-    TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3);
+    TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, (short) 1);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedResult = -5;
     assertEquals(res, expectedResult);
 
-    TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3);
+    TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, (short) 1);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedResult1 = 1;
     assertEquals(res1, expectedResult1);
 
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2));
+        new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), (short) 1);
     int res2 = tableBlockInfos.compareTo(tableBlockInfoTest);
     int expectedResult2 = -1;
     assertEquals(res2, expectedResult2);
@@ -149,13 +149,13 @@ public class TableBlockInfoTest {
 
   @Test public void compareToTestWithStartBlockletNo() {
     TableBlockInfo tableBlockInfo =
-        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2));
+        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedresult =-1;
     assertEquals(res, expectedresult);
 
     TableBlockInfo tableBlockInfo1 =
-        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2));
+        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), (short) 1);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedresult1 = 1;
     assertEquals(res1, expectedresult1);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
index 2d5750b..83b62a5 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
@@ -34,10 +34,10 @@ public class TableTaskInfoTest {
     tableBlockInfoList = new ArrayList<>(5);
 
     String[] locations = { "loc1", "loc2", "loc3" };
-    tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6));
+    tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, (short) 1));
 
     String[] locs = { "loc4", "loc5" };
-    tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6));
+    tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, (short) 1));
 
     tableTaskInfo = new TableTaskInfo("taskId", tableBlockInfoList);
   }
@@ -68,10 +68,10 @@ public class TableTaskInfoTest {
     List<TableBlockInfo> tableBlockInfoListTest = new ArrayList<>();
 
     String[] locations = { "loc1", "loc2", "loc3" };
-    tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6));
+    tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, (short) 1));
 
     String[] locations1 = { "loc1", "loc2", "loc3" };
-    tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6));
+    tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, (short) 1));
 
     List<String> res = TableTaskInfo.maxNoNodes(tableBlockInfoListTest);
     assert (res.equals(locs));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReaderTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReaderTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReaderTest.java
index 364a393..e1e4088 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReaderTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReaderTest.java
@@ -19,10 +19,17 @@
 
 package org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension;
 
+import static junit.framework.TestCase.assertEquals;
+
 import java.util.ArrayList;
 import java.util.List;
 
+import mockit.Mock;
+import mockit.MockUp;
+
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v1.CompressedDimensionChunkFileBasedReaderV1;
+import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
@@ -30,17 +37,12 @@ import org.apache.carbondata.core.datastorage.store.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastorage.store.compression.SnappyCompression;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.util.CarbonUtil;
-
-import mockit.Mock;
-import mockit.MockUp;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static junit.framework.TestCase.assertEquals;
-
 public class CompressedDimensionChunkFileBasedReaderTest {
 
-  static CompressedDimensionChunkFileBasedReader compressedDimensionChunkFileBasedReader;
+  static CompressedDimensionChunkFileBasedReaderV1 compressedDimensionChunkFileBasedReader;
   static List<DataChunk> dataChunkList;
 
   @BeforeClass public static void setup() {
@@ -49,9 +51,10 @@ public class CompressedDimensionChunkFileBasedReaderTest {
 
     DataChunk dataChunk = new DataChunk();
     dataChunkList.add(dataChunk);
-
+    BlockletInfo info = new BlockletInfo();
+    info.setDimensionColumnChunk(dataChunkList);
     compressedDimensionChunkFileBasedReader =
-        new CompressedDimensionChunkFileBasedReader(dataChunkList, eachColumnBlockSize, "filePath");
+        new CompressedDimensionChunkFileBasedReaderV1(info, eachColumnBlockSize, "filePath");
   }
 
   @Test public void readDimensionChunksTest() {
@@ -88,7 +91,7 @@ public class CompressedDimensionChunkFileBasedReaderTest {
       }
     };
 
-    int blockIndexes[] = { 0 };
+    int[][] blockIndexes = {{0,0}};
     DimensionColumnDataChunk dimensionColumnDataChunk[] =
         compressedDimensionChunkFileBasedReader.readDimensionChunks(fileHolder, blockIndexes);
     byte expectedResult[] = { 1 };
@@ -137,7 +140,7 @@ public class CompressedDimensionChunkFileBasedReaderTest {
         return true;
       }
     };
-    int blockIndexes[] = { 0 };
+    int[][] blockIndexes = {{0,0}};
     DimensionColumnDataChunk dimensionColumnDataChunk[] =
         compressedDimensionChunkFileBasedReader.readDimensionChunks(fileHolder, blockIndexes);
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
index ea4eb23..e7342b1 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
@@ -1,27 +1,30 @@
 package org.apache.carbondata.core.carbon.datastore.chunk.reader.measure;
 
+import static junit.framework.TestCase.assertEquals;
+
 import java.util.ArrayList;
 import java.util.List;
 
+import mockit.Mock;
+import mockit.MockUp;
+
 import org.apache.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
+import org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.v1.CompressedMeasureChunkFileBasedReaderV1;
+import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.datachunk.DataChunk;
 import org.apache.carbondata.core.datastorage.store.FileHolder;
 import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
 import org.apache.carbondata.core.datastorage.store.compression.ValueCompressonHolder;
 import org.apache.carbondata.core.datastorage.store.compression.type.UnCompressByteArray;
 import org.apache.carbondata.core.datastorage.store.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-
-import mockit.Mock;
-import mockit.MockUp;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static junit.framework.TestCase.assertEquals;
-
 public class CompressedMeasureChunkFileBasedReaderTest {
 
-  static CompressedMeasureChunkFileBasedReader compressedMeasureChunkFileBasedReader;
+  static CompressedMeasureChunkFileBasedReaderV1 compressedMeasureChunkFileBasedReader;
 
   @BeforeClass public static void setup() {
     List<DataChunk> dataChunkList = new ArrayList<>();
@@ -41,9 +44,18 @@ public class CompressedMeasureChunkFileBasedReaderTest {
     valueCompressionModel.setDecimal(decimal);
     Object maxValue[] = { 8 };
     valueCompressionModel.setMaxValue(maxValue);
-
+    ValueEncoderMeta meta = new ValueEncoderMeta();
+    meta.setMaxValue(8.0);
+    meta.setMinValue(1.0);
+    meta.setDecimal(1);
+    meta.setType('b');
+    List<ValueEncoderMeta> valueEncoderMetaList = new ArrayList<>();
+    valueEncoderMetaList.add(meta);
+    dataChunkList.get(0).setValueEncoderMeta(valueEncoderMetaList);
+    BlockletInfo info = new BlockletInfo();
+    info.setMeasureColumnChunk(dataChunkList);
     compressedMeasureChunkFileBasedReader =
-        new CompressedMeasureChunkFileBasedReader(dataChunkList, valueCompressionModel, "filePath");
+        new CompressedMeasureChunkFileBasedReaderV1(info, "filePath");
   }
 
   @Test public void readMeasureChunkTest() {
@@ -98,7 +110,7 @@ public class CompressedMeasureChunkFileBasedReaderTest {
       }
     };
 
-    int blockIndexes[] = { 0 };
+    int[][] blockIndexes = {{0,0}};
     MeasureColumnDataChunk measureColumnDataChunks[] =
         compressedMeasureChunkFileBasedReader.readMeasureChunks(fileHolder, blockIndexes);
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 39123e2..d959a5c 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.carbon.metadata.blocklet.index.*;
 import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.carbon.metadata.index.BlockIndexInfo;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.*;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -34,7 +35,6 @@ import org.apache.carbondata.format.*;
 import org.apache.carbondata.format.BlockletBTreeIndex;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.ColumnSchema;
-
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -166,6 +166,7 @@ public class CarbonMetadataUtilTest {
     segmentInfo.setNum_cols(0);
     segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
     IndexHeader indexHeader = new IndexHeader();
+    indexHeader.setVersion(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
     indexHeader.setSegment_info(segmentInfo);
     indexHeader.setTable_columns(columnSchemaList);
     IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 54910d8..c0d890c 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -20,6 +20,8 @@ package org.apache.carbondata.core.util;
 
 import mockit.Mock;
 import mockit.MockUp;
+
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
 import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
@@ -41,11 +43,13 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.pentaho.di.core.exception.KettleException;
+
 import java.io.*;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static junit.framework.TestCase.*;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -549,19 +553,21 @@ public class CarbonUtilTest {
   @Test public void testToReadMetadatFile() throws CarbonUtilException {
     new MockUp<DataFileFooterConverter>() {
       @SuppressWarnings("unused") @Mock
-      public DataFileFooter readDataFileFooter(String filePath, long blockOffset,
-          long blockLength) {
+      public DataFileFooter readDataFileFooter(TableBlockInfo info) {
         DataFileFooter fileFooter = new DataFileFooter();
-        fileFooter.setVersionId(1);
+        fileFooter.setVersionId((short)1);
         return fileFooter;
       }
     };
-    assertEquals(CarbonUtil.readMetadatFile("", 1L, 1L).getVersionId(), 1);
+    TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 1, (short)1);
+    
+    assertEquals(CarbonUtil.readMetadatFile(info).getVersionId(), 1);
   }
 
   @Test(expected = CarbonUtilException.class) public void testToReadMetadatFileWithException()
       throws Exception {
-    CarbonUtil.readMetadatFile("", 1L, 1L);
+	TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 1, (short)1);
+    CarbonUtil.readMetadatFile(info);
   }
 
   @Test public void testToFindDimension() {
@@ -695,7 +701,7 @@ public class CarbonUtilTest {
     dataChunk.setValueEncoderMeta(valueEncoderMetas);
     dataChunkList.add(dataChunk);
     ValueCompressionModel valueCompressionModel =
-        CarbonUtil.getValueCompressionModel(dataChunkList);
+        CarbonUtil.getValueCompressionModel(dataChunkList.get(0).getValueEncoderMeta());
     assertEquals(1, valueCompressionModel.getMaxValue().length);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index 1030d90..62d1ac7 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -152,7 +152,7 @@ public class DataFileFooterConverterTest {
       }
     };
     String[] arr = { "a", "b", "c" };
-    TableBlockInfo tableBlockInfo = new TableBlockInfo("file", 3, "id", arr, 3);
+    TableBlockInfo tableBlockInfo = new TableBlockInfo("file", 3, "id", arr, 3, (short) 1);
     tableBlockInfo.getBlockletInfos().setNoOfBlockLets(3);
     List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
     tableBlockInfoList.add(tableBlockInfo);
@@ -214,9 +214,9 @@ public class DataFileFooterConverterTest {
         new org.apache.carbondata.format.BlockletInfo();
     List<org.apache.carbondata.format.BlockletInfo> blockletInfoArrayList = new ArrayList<>();
     blockletInfoArrayList.add(blockletInfo);
-    final FileFooter fileFooter =
-        new FileFooter(1, 3, columnSchemas, segmentInfo1, blockletIndexArrayList,
-            blockletInfoArrayList);
+    final FileFooter fileFooter = 
+        new FileFooter(1, 3, columnSchemas, segmentInfo1, blockletIndexArrayList);
+    fileFooter.setBlocklet_info_list(blockletInfoArrayList);
     BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
     blockletBTreeIndex.setStart_key("1".getBytes());
     blockletBTreeIndex.setEnd_key("3".getBytes());
@@ -254,7 +254,8 @@ public class DataFileFooterConverterTest {
     segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols());
     dataFileFooter.setNumberOfRows(3);
     dataFileFooter.setSegmentInfo(segmentInfo);
-    DataFileFooter result = dataFileFooterConverter.readDataFileFooter("file", 1, 1);
+    TableBlockInfo info = new TableBlockInfo("file", 1, "0", new String[0], 1, (short)1);
+    DataFileFooter result = dataFileFooterConverter.readDataFileFooter(info);
     assertEquals(result.getNumberOfRows(), 3);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index dd4514d..759fbf7 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -113,6 +113,22 @@ struct DataChunk{
     13: optional list<binary> encoder_meta; // extra information required by encoders
 }
 
+/**
+* Represents a chunk of data. The chunk can be a single column stored in Column Major format or a group of columns stored in Row Major Format.
+**/
+struct DataChunk2{
+    1: required ChunkCompressionMeta chunk_meta; // the metadata of a chunk
+    2: required bool rowMajor; // whether this chunk is a row chunk or column chunk ? Decide whether this can be replace with counting od columnIDs
+	/** The column IDs in this chunk, in the order in which the data is physically stored, will have atleast one column ID for columnar format, many column ID for row major format**/
+    3: required i32 data_page_length; // length of data page
+    4: optional i32 rowid_page_length; //length of row id page, only if encoded using inverted index
+    5: optional i32 rle_page_length;	// length of rle page, only if RLE coded.
+    6: optional PresenceMeta presence; // information about presence of values in each row of this column chunk
+    7: optional SortState sort_state;
+    8: optional list<schema.Encoding> encoders; // The List of encoders overriden at node level
+    9: optional list<binary> encoder_meta; // extra information required by encoders
+}
+
 
 /**
 *	Information about a blocklet
@@ -123,6 +139,15 @@ struct BlockletInfo{
 }
 
 /**
+*	Information about a blocklet
+*/
+struct BlockletInfo2{
+    1: required i32 num_rows;	// Number of rows in this blocklet
+    2: required list<i64> column_data_chunks_offsets;	// Information about offsets all column chunks in this blocklet
+    3: required list<i16> column_data_chunks_length;	// Information about length all column chunks in this blocklet
+}
+
+/**
 * Footer for indexed carbon file
 */
 struct FileFooter{
@@ -131,8 +156,9 @@ struct FileFooter{
     3: required list<schema.ColumnSchema> table_columns;	// Description of columns in this file
     4: required SegmentInfo segment_info;	// Segment info (will be same/repeated for all files in this segment)
     5: required list<BlockletIndex> blocklet_index_list;	// blocklet index of all blocklets in this file
-    6: required list<BlockletInfo> blocklet_info_list;	// Information about blocklets of all columns in this file
-    7: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary
+    6: optional list<BlockletInfo> blocklet_info_list;	// Information about blocklets of all columns in this file
+    7: optional list<BlockletInfo2> blocklet_info_list2;	// Information about blocklets of all columns in this file
+    8: optional dictionary.ColumnDictionaryChunk dictionary; // blocklet local dictionary
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index a44a78c..8b453c7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -82,6 +82,7 @@ import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.util.StringUtils;
 
+
 /**
  * Carbon Input format class representing one carbon table
  */
@@ -180,8 +181,17 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * Set list of segments to access
    */
   public static void setSegmentsToAccess(Configuration configuration, List<String> validSegments) {
-    configuration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS,
-        CarbonUtil.getSegmentString(validSegments));
+    configuration
+        .set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
+  }
+
+  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) {
+    String dirs = configuration.get(INPUT_DIR, "");
+    String[] inputPaths = StringUtils.split(dirs);
+    if (inputPaths.length == 0) {
+      throw new InvalidPathException("No input paths specified in job");
+    }
+    return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
   }
 
   /**
@@ -193,8 +203,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * @return List<InputSplit> list of CarbonInputSplit
    * @throws IOException
    */
-  @Override
-  public List<InputSplit> getSplits(JobContext job) throws IOException {
+  @Override public List<InputSplit> getSplits(JobContext job) throws IOException {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
     List<String> invalidSegments = new ArrayList<>();
 
@@ -245,7 +254,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       if (segmentId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
         continue;
       }
-      carbonSplits.add(CarbonInputSplit.from(segmentId, fileSplit));
+      carbonSplits.add(CarbonInputSplit
+          .from(segmentId, fileSplit, CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION));
     }
     return carbonSplits;
   }
@@ -278,21 +288,13 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
         result.add(new CarbonInputSplit(segmentNo, new Path(tableBlockInfo.getFilePath()),
             tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
-            tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets()));
+            tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
+            tableBlockInfo.getVersion()));
       }
     }
     return result;
   }
 
-  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) {
-    String dirs = configuration.get(INPUT_DIR, "");
-    String[] inputPaths = StringUtils.split(dirs);
-    if (inputPaths.length == 0) {
-      throw new InvalidPathException("No input paths specified in job");
-    }
-    return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
-  }
-
   private Expression getFilterPredicates(Configuration configuration) {
     try {
       String filterExprString = configuration.get(FILTER_PREDICATE);
@@ -313,8 +315,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       FilterExpressionProcessor filterExpressionProcessor,
       AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
       String segmentId) throws IndexBuilderException, IOException {
-    QueryStatisticsRecorder recorder =
-            CarbonTimeStatisticsFactory.createDriverRecorder();
+    QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
     Map<String, AbstractIndex> segmentIndexMap =
         getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId);
@@ -340,8 +341,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       }
       resultFilterredBlocks.addAll(filterredBlocks);
     }
-    statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER,
-        System.currentTimeMillis());
+    statistic
+        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
     recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
     return resultFilterredBlocks;
   }
@@ -349,8 +350,8 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   /**
    * Below method will be used to get the table block info
    *
-   * @param job                     job context
-   * @param segmentId               number of segment id
+   * @param job       job context
+   * @param segmentId number of segment id
    * @return list of table block
    * @throws IOException
    */
@@ -371,7 +372,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       tableBlockInfoList.add(
           new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
               segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
-              blockletInfos));
+              blockletInfos, carbonInputSplit.getVersion()));
     }
     return tableBlockInfoList;
   }
@@ -384,8 +385,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
 
     // if segment tree is not loaded, load the segment tree
     if (segmentIndexMap == null) {
-      List<TableBlockInfo> tableBlockInfoList =
-          getTableBlockInfo(job, segmentId);
+      List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job, segmentId);
 
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
       segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
@@ -428,8 +428,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return blocks;
   }
 
-  @Override
-  public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+  @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
     CarbonTable carbonTable = getCarbonTable(configuration);
@@ -482,18 +481,15 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return readSupport;
   }
 
-  @Override
-  protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
+  @Override protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
     return super.computeSplitSize(blockSize, minSize, maxSize);
   }
 
-  @Override
-  protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+  @Override protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
     return super.getBlockIndex(blkLocations, offset);
   }
 
-  @Override
-  protected List<FileStatus> listStatus(JobContext job) throws IOException {
+  @Override protected List<FileStatus> listStatus(JobContext job) throws IOException {
     List<FileStatus> result = new ArrayList<FileStatus>();
     String[] segmentsToConsider = getSegmentsToAccess(job);
     if (segmentsToConsider.length == 0) {
@@ -504,13 +500,11 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return result;
   }
 
-  @Override
-  protected boolean isSplitable(JobContext context, Path filename) {
+  @Override protected boolean isSplitable(JobContext context, Path filename) {
     try {
       // Don't split the file if it is local file system
       FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
-      if (fileSystem instanceof LocalFileSystem)
-      {
+      if (fileSystem instanceof LocalFileSystem) {
         return false;
       }
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 132ee43..efc4f77 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.hadoop.internal.index.Block;
 
 import org.apache.hadoop.fs.Path;
@@ -38,13 +39,12 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 /**
  * Carbon input split to allow distributed read of CarbonInputFormat.
  */
-public class CarbonInputSplit extends FileSplit implements Distributable, Serializable, Writable,
-    Block {
+public class CarbonInputSplit extends FileSplit
+    implements Distributable, Serializable, Writable, Block {
 
   private static final long serialVersionUID = 3520344046772190207L;
-  private String segmentId;
   public String taskId;
-
+  private String segmentId;
   /*
    * Invalid segments that need to be removed in task side index
    */
@@ -55,40 +55,72 @@ public class CarbonInputSplit extends FileSplit implements Distributable, Serial
    */
   private int numberOfBlocklets;
 
-  public  CarbonInputSplit() {
+  private short version = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
+
+  public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
   }
 
-  private CarbonInputSplit(String segmentId, Path path, long start, long length,
-      String[] locations) {
+  private CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
+      short version) {
     super(path, start, length, locations);
     this.segmentId = segmentId;
     this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
     this.invalidSegments = new ArrayList<>();
+    this.version = version;
   }
 
-  public CarbonInputSplit(String segmentId, Path path, long start, long length,
-      String[] locations, int numberOfBlocklets) {
-    this(segmentId, path, start, length, locations);
+  public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
+      int numberOfBlocklets, short version) {
+    this(segmentId, path, start, length, locations, version);
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
-  public static CarbonInputSplit from(String segmentId, FileSplit split) throws IOException {
+  public static CarbonInputSplit from(String segmentId, FileSplit split, short version)
+      throws IOException {
     return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
-        split.getLocations());
+        split.getLocations(), version);
+  }
+
+  public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) {
+    List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
+    for (CarbonInputSplit split : splitList) {
+      BlockletInfos blockletInfos =
+          new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets());
+      try {
+        tableBlockInfoList.add(
+            new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
+                split.getLocations(), split.getLength(), blockletInfos, split.getVersion()));
+      } catch (IOException e) {
+        throw new RuntimeException("fail to get location of split: " + split, e);
+      }
+    }
+    return tableBlockInfoList;
+  }
+
+  public static TableBlockInfo getTableBlockInfo(CarbonInputSplit inputSplit) {
+    BlockletInfos blockletInfos =
+        new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets());
+    try {
+      return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
+          inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
+          blockletInfos, inputSplit.getVersion());
+    } catch (IOException e) {
+      throw new RuntimeException("fail to get location of split: " + inputSplit, e);
+    }
   }
 
   public String getSegmentId() {
     return segmentId;
   }
 
-  @Override
-  public void readFields(DataInput in) throws IOException {
+  @Override public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     this.segmentId = in.readUTF();
+    this.version = in.readShort();
     int numInvalidSegment = in.readInt();
     invalidSegments = new ArrayList<>(numInvalidSegment);
     for (int i = 0; i < numInvalidSegment; i++) {
@@ -96,17 +128,17 @@ public class CarbonInputSplit extends FileSplit implements Distributable, Serial
     }
   }
 
-  @Override
-  public void write(DataOutput out) throws IOException {
+  @Override public void write(DataOutput out) throws IOException {
     super.write(out);
     out.writeUTF(segmentId);
+    out.writeShort(version);
     out.writeInt(invalidSegments.size());
-    for (String invalidSegment: invalidSegments) {
+    for (String invalidSegment : invalidSegments) {
       out.writeUTF(invalidSegment);
     }
   }
 
-  public List<String> getInvalidSegments(){
+  public List<String> getInvalidSegments() {
     return invalidSegments;
   }
 
@@ -116,15 +148,23 @@ public class CarbonInputSplit extends FileSplit implements Distributable, Serial
 
   /**
    * returns the number of blocklets
+   *
    * @return
    */
   public int getNumberOfBlocklets() {
     return numberOfBlocklets;
   }
 
-  @Override
-  public int compareTo(Distributable o) {
-    CarbonInputSplit other = (CarbonInputSplit)o;
+  public short getVersion() {
+    return version;
+  }
+
+  public void setVersion(short version) {
+    this.version = version;
+  }
+
+  @Override public int compareTo(Distributable o) {
+    CarbonInputSplit other = (CarbonInputSplit) o;
     int compareResult = 0;
     // get the segment id
     // converr seg ID to double.
@@ -163,34 +203,15 @@ public class CarbonInputSplit extends FileSplit implements Distributable, Serial
     return 0;
   }
 
-  public static List<TableBlockInfo> createBlocks(List<CarbonInputSplit> splitList) {
-    List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
-    for (CarbonInputSplit split : splitList) {
-      BlockletInfos blockletInfos = new BlockletInfos(split.getNumberOfBlocklets(), 0,
-          split.getNumberOfBlocklets());
-      try {
-        tableBlockInfoList.add(
-            new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
-                split.getLocations(), split.getLength(), blockletInfos));
-      } catch (IOException e) {
-        throw new RuntimeException("fail to get location of split: " + split, e);
-      }
-    }
-    return tableBlockInfoList;
-  }
-
-  @Override
-  public String getBlockPath() {
+  @Override public String getBlockPath() {
     return getPath().getName();
   }
 
-  @Override
-  public List<Long> getMatchedBlocklets() {
+  @Override public List<Long> getMatchedBlocklets() {
     return null;
   }
 
-  @Override
-  public boolean fullScan() {
+  @Override public boolean fullScan() {
     return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
index dbc71ec..c238e10 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java
@@ -94,7 +94,8 @@ class InMemoryBTreeIndex implements Index {
       TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
       result.add(new CarbonInputSplit(segment.getId(), new Path(tableBlockInfo.getFilePath()),
           tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
-          tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets()));
+          tableBlockInfo.getLocations(), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
+          tableBlockInfo.getVersion()));
     }
     return result;
   }
@@ -138,7 +139,7 @@ class InMemoryBTreeIndex implements Index {
       tableBlockInfoList.add(
           new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
               segment.getId(), carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
-              blockletInfos));
+              blockletInfos, carbonInputSplit.getVersion()));
     }
     return tableBlockInfoList;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
index 753f485..404dd1a 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
@@ -113,14 +113,12 @@ public class CarbonCompactionUtil {
     for (TableBlockInfo blockInfo : tableBlockInfoList) {
       List<DataFileFooter> eachSegmentBlocks = new ArrayList<>();
       String segId = blockInfo.getSegmentId();
-
       DataFileFooter dataFileMatadata = null;
       // check if segId is already present in map
       List<DataFileFooter> metadataList = segmentBlockInfoMapping.get(segId);
       try {
         dataFileMatadata = CarbonUtil
-            .readMetadatFile(blockInfo.getFilePath(), blockInfo.getBlockOffset(),
-                blockInfo.getBlockLength());
+            .readMetadatFile(blockInfo);
       } catch (CarbonUtilException e) {
         throw new IndexBuilderException(e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index c9e3b6b..93e1590 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -250,8 +250,8 @@ class CarbonMergerRDD[K, V](
       var dataFileFooter: DataFileFooter = null
 
       try {
-        dataFileFooter = CarbonUtil.readMetadatFile(carbonInputSplit.getPath.toString(),
-          carbonInputSplit.getStart, carbonInputSplit.getLength)
+        dataFileFooter = CarbonUtil.readMetadatFile(
+            CarbonInputSplit.getTableBlockInfo(carbonInputSplit))
       } catch {
         case e: CarbonUtilException =>
           logError("Exception in preparing the data file footer for compaction " + e.getMessage)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index a750493..1801408 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -805,7 +805,7 @@ object CarbonDataRDDFactory {
             val fileSplit = inputSplit.asInstanceOf[FileSplit]
             new TableBlockInfo(fileSplit.getPath.toString,
               fileSplit.getStart, "1",
-              fileSplit.getLocations, fileSplit.getLength
+              fileSplit.getLocations, fileSplit.getLength, 0
             ).asInstanceOf[Distributable]
           }
           )

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/integration/spark/src/test/resources/OLDFORMATTABLE.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/OLDFORMATTABLE.csv b/integration/spark/src/test/resources/OLDFORMATTABLE.csv
new file mode 100644
index 0000000..12a6fe8
--- /dev/null
+++ b/integration/spark/src/test/resources/OLDFORMATTABLE.csv
@@ -0,0 +1,34 @@
+country,name,phonetype,serialname,salary
+china,aaa1,phone197,ASD69643,15000
+china,aaa2,phone756,ASD42892,15001
+china,aaa3,phone1904,ASD37014,15002
+china,aaa4,phone2435,ASD66902,15003
+china,aaa5,phone2441,ASD90633,15004
+china,aaa6,phone294,ASD59961,15005
+china,aaa7,phone610,ASD14875,15006
+china,aaa8,phone1848,ASD57308,15007
+china,aaa9,phone706,ASD86717,15008
+usa,aaa10,phone685,ASD30505,15009
+china,aaa11,phone1554,ASD26101,15010
+china,aaa12,phone1781,ASD85711,15011
+china,aaa13,phone943,ASD39200,15012
+china,aaa14,phone1954,ASD80468,15013
+china,aaa15,phone451,ASD1954,15014
+china,aaa16,phone390,ASD38513,15015
+china,aaa17,phone1929,ASD86213,15016
+usa,aaa18,phone910,ASD88812,15017
+china,aaa19,phone2151,ASD9316,15018
+china,aaa20,phone2625,ASD62597,15019
+china,aaa21,phone1371,ASD27896,15020
+china,aaa22,phone945,ASD79760,15021
+china,aaa23,phone2177,ASD45410,15022
+china,aaa24,phone1586,ASD80645,15023
+china,aaa25,phone1310,ASD36408,15024
+china,aaa26,phone1579,ASD14571,15025
+china,aaa27,phone2123,ASD36243,15026
+china,aaa28,phone2334,ASD57825,15027
+china,aaa29,phone1166,ASD26161,15028
+china,aaa30,phone2248,ASD47899,15029
+china,aaa31,phone475,ASD89811,15030
+china,aaa32,phone2499,ASD87974,15031
+china,aaa33,phone2333,ASD62408,15032
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/integration/spark/src/test/resources/OLDFORMATTABLEHIVE.csv
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/OLDFORMATTABLEHIVE.csv b/integration/spark/src/test/resources/OLDFORMATTABLEHIVE.csv
new file mode 100644
index 0000000..55642aa
--- /dev/null
+++ b/integration/spark/src/test/resources/OLDFORMATTABLEHIVE.csv
@@ -0,0 +1,33 @@
+china,aaa1,phone197,ASD69643,15000
+china,aaa2,phone756,ASD42892,15001
+china,aaa3,phone1904,ASD37014,15002
+china,aaa4,phone2435,ASD66902,15003
+china,aaa5,phone2441,ASD90633,15004
+china,aaa6,phone294,ASD59961,15005
+china,aaa7,phone610,ASD14875,15006
+china,aaa8,phone1848,ASD57308,15007
+china,aaa9,phone706,ASD86717,15008
+usa,aaa10,phone685,ASD30505,15009
+china,aaa11,phone1554,ASD26101,15010
+china,aaa12,phone1781,ASD85711,15011
+china,aaa13,phone943,ASD39200,15012
+china,aaa14,phone1954,ASD80468,15013
+china,aaa15,phone451,ASD1954,15014
+china,aaa16,phone390,ASD38513,15015
+china,aaa17,phone1929,ASD86213,15016
+usa,aaa18,phone910,ASD88812,15017
+china,aaa19,phone2151,ASD9316,15018
+china,aaa20,phone2625,ASD62597,15019
+china,aaa21,phone1371,ASD27896,15020
+china,aaa22,phone945,ASD79760,15021
+china,aaa23,phone2177,ASD45410,15022
+china,aaa24,phone1586,ASD80645,15023
+china,aaa25,phone1310,ASD36408,15024
+china,aaa26,phone1579,ASD14571,15025
+china,aaa27,phone2123,ASD36243,15026
+china,aaa28,phone2334,ASD57825,15027
+china,aaa29,phone1166,ASD26161,15028
+china,aaa30,phone2248,ASD47899,15029
+china,aaa31,phone475,ASD89811,15030
+china,aaa32,phone2499,ASD87974,15031
+china,aaa33,phone2333,ASD62408,15032
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
new file mode 100644
index 0000000..431180c
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.allqueries
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+/*
+ * Test Class for query without data load
+ *
+ */
+class TestQueryWithOldCarbonDataFile extends QueryTest with BeforeAndAfterAll {
+  override def beforeAll {
+	  CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");         
+    sql("drop table if exists OldFormatTable")
+    sql("drop table if exists OldFormatTableHIVE")
+     sql("""
+           CREATE TABLE IF NOT EXISTS OldFormatTable
+           (country String,
+           name String, phonetype String, serialname String, salary Int)
+           STORED BY 'carbondata'
+           """)
+      sql("""
+           CREATE TABLE IF NOT EXISTS OldFormatTableHIVE
+           (country String,
+           name String, phonetype String, serialname String, salary Int)
+          row format delimited fields terminated by ','
+           """)      
+    sql("LOAD DATA local inpath './src/test/resources/OLDFORMATTABLE.csv' INTO table OldFormatTable");       
+   sql(s"""
+           LOAD DATA LOCAL INPATH './src/test/resources/OLDFORMATTABLEHIVE.csv' into table OldFormatTableHIVE
+           """)
+
+  }
+
+  CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "2");
+  test("Test select * query") {
+    checkAnswer(
+      sql("select * from OldFormatTable"), sql("select * from OldFormatTableHIVE")
+    )
+  }
+
+  override def afterAll {
+     CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");
+    sql("drop table if exists OldFormatTable")
+    sql("drop table if exists OldFormatTableHIVE")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
new file mode 100644
index 0000000..2fbb00e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.store;
+
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
+import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
+import org.apache.carbondata.processing.store.writer.CarbonFactDataWriterImpl2;
+import org.apache.carbondata.processing.store.writer.CarbonFactDataWriterImplForIntIndexAndAggBlock;
+
+/**
+ * Factory class to get the writer instance
+ */
+public class CarbonDataWriterFactory {
+
+  /**
+   * static instance
+   */
+  private static final CarbonDataWriterFactory CARBON_DATA_WRITER_FACTORY =
+      new CarbonDataWriterFactory();
+
+  /**
+   * private constructor
+   */
+  private CarbonDataWriterFactory() {
+    // TODO Auto-generated constructor stub
+  }
+
+  /**
+   * Below method will be used to get the instance of factory class
+   *
+   * @return fact class instance
+   */
+  public static CarbonDataWriterFactory getInstance() {
+    return CARBON_DATA_WRITER_FACTORY;
+  }
+
+  /**
+   * Below method will be used to get the writer instance based on version
+   *
+   * @param version            writer version
+   * @param carbonDataWriterVo writer vo object
+   * @return writer instance
+   */
+  public CarbonFactDataWriter<?> getFactDataWriter(final short version,
+      final CarbonDataWriterVo carbonDataWriterVo) {
+    switch (version) {
+      case 2:
+        return new CarbonFactDataWriterImpl2(carbonDataWriterVo);
+      default:
+        return new CarbonFactDataWriterImplForIntIndexAndAggBlock(carbonDataWriterVo);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d54dc647/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index f3bd484..1fa14f0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -70,8 +70,8 @@ import org.apache.carbondata.processing.store.colgroup.ColGroupDataHolder;
 import org.apache.carbondata.processing.store.colgroup.ColGroupMinMax;
 import org.apache.carbondata.processing.store.colgroup.ColumnDataHolder;
 import org.apache.carbondata.processing.store.colgroup.DataHolder;
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.CarbonFactDataWriterImplForIntIndexAndAggBlock;
 import org.apache.carbondata.processing.store.writer.NodeHolder;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
@@ -1306,8 +1306,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
         blockKeySize.length - noOfColStore);
     this.dataWriter =
-        getFactDataWriter(this.storeLocation, this.measureCount, this.mdkeyLength, this.tableName,
-            fileManager, keyBlockSize);
+        getFactDataWriter(keyBlockSize);
     this.dataWriter.setIsNoDictionary(isNoDictionary);
     // initialize the channel;
     this.dataWriter.initializeWriter();
@@ -1396,13 +1395,45 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return nullvalueIndexBitset;
   }
 
-  private CarbonFactDataWriter<?> getFactDataWriter(String storeLocation, int measureCount,
-      int mdKeyLength, String tableName, IFileManagerComposite fileManager, int[] keyBlockSize) {
-    return new CarbonFactDataWriterImplForIntIndexAndAggBlock(storeLocation, measureCount,
-        mdKeyLength, tableName, fileManager, keyBlockSize, aggKeyBlock, isComplexTypes(),
-        noDictionaryCount, carbonDataFileAttributes, databaseName, wrapperColumnSchemaList,
-        noDictionaryCount, dimensionType, carbonDataDirectoryPath, colCardinality,
-        segmentProperties, tableBlockSize);
+  /**
+   * Below method will be used to get the fact data writer instance
+   *
+   * @param keyBlockSize
+   * @return data writer instance
+   */
+  private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
+    short version = Short.parseShort(
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
+    return CarbonDataWriterFactory.getInstance()
+        .getFactDataWriter(version, getDataWriterVo(keyBlockSize));
+  }
+
+  /**
+   * Below method will be used to get the writer vo
+   *
+   * @param keyBlockSize size of each key block
+   * @return data writer vo object
+   */
+  private CarbonDataWriterVo getDataWriterVo(int[] keyBlockSize) {
+    CarbonDataWriterVo carbonDataWriterVo = new CarbonDataWriterVo();
+    carbonDataWriterVo.setStoreLocation(storeLocation);
+    carbonDataWriterVo.setMeasureCount(measureCount);
+    carbonDataWriterVo.setMdKeyLength(mdkeyLength);
+    carbonDataWriterVo.setTableName(tableName);
+    carbonDataWriterVo.setKeyBlockSize(keyBlockSize);
+    carbonDataWriterVo.setFileManager(fileManager);
+    carbonDataWriterVo.setAggBlocks(aggKeyBlock);
+    carbonDataWriterVo.setIsComplexType(isComplexTypes());
+    carbonDataWriterVo.setNoDictionaryCount(noDictionaryCount);
+    carbonDataWriterVo.setCarbonDataFileAttributes(carbonDataFileAttributes);
+    carbonDataWriterVo.setDatabaseName(databaseName);
+    carbonDataWriterVo.setWrapperColumnSchemaList(wrapperColumnSchemaList);
+    carbonDataWriterVo.setIsDictionaryColumn(dimensionType);
+    carbonDataWriterVo.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
+    carbonDataWriterVo.setColCardinality(colCardinality);
+    carbonDataWriterVo.setSegmentProperties(segmentProperties);
+    carbonDataWriterVo.setTableBlocksize(tableBlockSize);
+    return carbonDataWriterVo;
   }
 
   private boolean[] isComplexTypes() {
@@ -1571,7 +1602,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
           if (!processingComplete || blockletProcessingCount.get() > 0) {
             producerExecutorService.shutdownNow();
             resetBlockletProcessingCount();
-            throw new CarbonDataWriterException(throwable.getMessage(), throwable);
+            LOGGER.error(throwable, "Problem while writing the carbon data file");
+            throw new CarbonDataWriterException(throwable.getMessage());
           }
         } finally {
           semaphore.release();