You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/12/01 16:35:44 UTC

[1/3] incubator-carbondata git commit: add file format version enum

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 90bc36699 -> 5406cee1b


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
deleted file mode 100644
index 8c2608b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.carbondata.processing.store.writer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage;
-import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
-import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.util.CarbonMetadataUtil;
-import org.apache.carbondata.core.writer.CarbonFooterWriter;
-import org.apache.carbondata.format.FileFooter;
-import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-
-public class CarbonFactDataWriterImplForIntIndexAndAggBlock extends AbstractFactDataWriter<int[]> {
-
-  private static final LogService LOGGER = LogServiceFactory
-      .getLogService(CarbonFactDataWriterImplForIntIndexAndAggBlock.class.getName());
-
-  public CarbonFactDataWriterImplForIntIndexAndAggBlock(CarbonDataWriterVo dataWriterVo) {
-    super(dataWriterVo);
-  }
-
-  @Override
-  public NodeHolder buildDataNodeHolder(IndexStorage<int[]>[] keyStorageArray, byte[][] dataArray,
-      int entryCount, byte[] startKey, byte[] endKey, ValueCompressionModel compressionModel,
-      byte[] noDictionaryStartKey, byte[] noDictionaryEndKey) throws CarbonDataWriterException {
-    // if there are no NO-Dictionary column present in the table then
-    // set the empty byte array
-    if (null == noDictionaryEndKey) {
-      noDictionaryEndKey = new byte[0];
-    }
-    if (null == noDictionaryStartKey) {
-      noDictionaryStartKey = new byte[0];
-    }
-    // total measure length;
-    int totalMsrArrySize = 0;
-    // current measure length;
-    int currentMsrLenght = 0;
-    int totalKeySize = 0;
-    int keyBlockSize = 0;
-
-    boolean[] isSortedData = new boolean[keyStorageArray.length];
-    int[] keyLengths = new int[keyStorageArray.length];
-
-    //below will calculate min and max value for each column
-    //for below 2d array, first index will be for column and second will be min max
-    // value for same column
-    //    byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
-
-    byte[][] allMinValue = new byte[keyStorageArray.length][];
-    byte[][] allMaxValue = new byte[keyStorageArray.length][];
-    byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, entryCount);
-    boolean[] colGrpBlock = new boolean[keyStorageArray.length];
-
-    for (int i = 0; i < keyLengths.length; i++) {
-      keyLengths[i] = keyBlockData[i].length;
-      isSortedData[i] = keyStorageArray[i].isAlreadySorted();
-      if (!isSortedData[i]) {
-        keyBlockSize++;
-
-      }
-      totalKeySize += keyLengths[i];
-      if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) {
-        allMinValue[i] = keyStorageArray[i].getMin();
-        allMaxValue[i] = keyStorageArray[i].getMax();
-      } else {
-        allMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
-        allMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
-      }
-      //if keyStorageArray is instance of ColGroupBlockStorage than it's colGroup chunk
-      if (keyStorageArray[i] instanceof ColGroupBlockStorage) {
-        colGrpBlock[i] = true;
-      }
-    }
-    int[] keyBlockIdxLengths = new int[keyBlockSize];
-    byte[][] dataAfterCompression = new byte[keyBlockSize][];
-    byte[][] indexMap = new byte[keyBlockSize][];
-    int idx = 0;
-    for (int i = 0; i < isSortedData.length; i++) {
-      if (!isSortedData[i]) {
-        dataAfterCompression[idx] =
-            numberCompressor.compress(keyStorageArray[i].getDataAfterComp());
-        if (null != keyStorageArray[i].getIndexMap()
-            && keyStorageArray[i].getIndexMap().length > 0) {
-          indexMap[idx] = numberCompressor.compress(keyStorageArray[i].getIndexMap());
-        } else {
-          indexMap[idx] = new byte[0];
-        }
-        keyBlockIdxLengths[idx] = (dataAfterCompression[idx].length + indexMap[idx].length)
-            + CarbonCommonConstants.INT_SIZE_IN_BYTE;
-        idx++;
-      }
-    }
-    int compressDataBlockSize = 0;
-    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
-      if (dataWriterVo.getAggBlocks()[i]) {
-        compressDataBlockSize++;
-      }
-    }
-    byte[][] compressedDataIndex = new byte[compressDataBlockSize][];
-    int[] dataIndexMapLength = new int[compressDataBlockSize];
-    idx = 0;
-    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
-      if (dataWriterVo.getAggBlocks()[i]) {
-        try {
-          compressedDataIndex[idx] =
-              numberCompressor.compress(keyStorageArray[i].getDataIndexMap());
-          dataIndexMapLength[idx] = compressedDataIndex[idx].length;
-          idx++;
-        } catch (Exception e) {
-          throw new CarbonDataWriterException(e.getMessage());
-        }
-      }
-    }
-
-    int[] msrLength = new int[dataWriterVo.getMeasureCount()];
-    // calculate the total size required for all the measure and get the
-    // each measure size
-    for (int i = 0; i < dataArray.length; i++) {
-      currentMsrLenght = dataArray[i].length;
-      totalMsrArrySize += currentMsrLenght;
-      msrLength[i] = currentMsrLenght;
-    }
-    NodeHolder holder = new NodeHolder();
-    holder.setDataArray(dataArray);
-    holder.setKeyArray(keyBlockData);
-    // end key format will be <length of dictionary key><length of no
-    // dictionary key><DictionaryKey><No Dictionary key>
-    byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
-    ByteBuffer buffer = ByteBuffer.allocate(
-        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
-            + endKey.length + updatedNoDictionaryEndKey.length);
-    buffer.putInt(endKey.length);
-    buffer.putInt(updatedNoDictionaryEndKey.length);
-    buffer.put(endKey);
-    buffer.put(updatedNoDictionaryEndKey);
-    buffer.rewind();
-    holder.setEndKey(buffer.array());
-    holder.setMeasureLenght(msrLength);
-    byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
-    // start key format will be <length of dictionary key><length of no
-    // dictionary key><DictionaryKey><No Dictionary key>
-    buffer = ByteBuffer.allocate(
-        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
-            + startKey.length + updatedNoDictionaryStartKey.length);
-    buffer.putInt(startKey.length);
-    buffer.putInt(updatedNoDictionaryStartKey.length);
-    buffer.put(startKey);
-    buffer.put(updatedNoDictionaryStartKey);
-    buffer.rewind();
-    holder.setStartKey(buffer.array());
-    holder.setEntryCount(entryCount);
-    holder.setKeyLengths(keyLengths);
-    holder.setKeyBlockIndexLength(keyBlockIdxLengths);
-    holder.setIsSortedKeyBlock(isSortedData);
-    holder.setCompressedIndex(dataAfterCompression);
-    holder.setCompressedIndexMap(indexMap);
-    holder.setDataIndexMapLength(dataIndexMapLength);
-    holder.setCompressedDataIndex(compressedDataIndex);
-    holder.setCompressionModel(compressionModel);
-    holder.setTotalDimensionArrayLength(totalKeySize);
-    holder.setTotalMeasureArrayLength(totalMsrArrySize);
-    //setting column min max value
-    holder.setColumnMaxData(allMaxValue);
-    holder.setColumnMinData(allMinValue);
-    holder.setAggBlocks(dataWriterVo.getAggBlocks());
-    holder.setColGrpBlocks(colGrpBlock);
-    return holder;
-  }
-
-  @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
-    int indexBlockSize = 0;
-    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
-      indexBlockSize += holder.getKeyBlockIndexLength()[i] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
-    }
-
-    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
-      indexBlockSize += holder.getDataIndexMapLength()[i];
-    }
-
-    long blockletDataSize =
-        holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength()
-            + indexBlockSize;
-    updateBlockletFileChannel(blockletDataSize);
-    // write data to file and get its offset
-    long offset = writeDataToFile(holder, fileChannel);
-    // get the blocklet info for currently added blocklet
-    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset);
-    // add blocklet info to list
-    blockletInfoList.add(blockletInfo);
-    LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
-  }
-
-  /**
-   * This method is responsible for writing blocklet to the data file
-   *
-   * @return file offset offset is the current position of the file
-   * @throws CarbonDataWriterException if will throw CarbonDataWriterException when any thing
-   *                                   goes wrong while while writing the leaf file
-   */
-  private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
-      throws CarbonDataWriterException {
-    // create byte buffer
-    byte[][] compressedIndex = nodeHolder.getCompressedIndex();
-    byte[][] compressedIndexMap = nodeHolder.getCompressedIndexMap();
-    byte[][] compressedDataIndex = nodeHolder.getCompressedDataIndex();
-    int indexBlockSize = 0;
-    int index = 0;
-    for (int i = 0; i < nodeHolder.getKeyBlockIndexLength().length; i++) {
-      indexBlockSize +=
-          nodeHolder.getKeyBlockIndexLength()[index++] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
-    }
-
-    for (int i = 0; i < nodeHolder.getDataIndexMapLength().length; i++) {
-      indexBlockSize += nodeHolder.getDataIndexMapLength()[i];
-    }
-    ByteBuffer byteBuffer = ByteBuffer.allocate(
-        nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength()
-            + indexBlockSize);
-    long offset = 0;
-    try {
-      // get the current offset
-      offset = channel.size();
-      // add key array to byte buffer
-      for (int i = 0; i < nodeHolder.getKeyArray().length; i++) {
-        byteBuffer.put(nodeHolder.getKeyArray()[i]);
-      }
-      for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
-        byteBuffer.put(nodeHolder.getDataArray()[i]);
-      }
-      // add measure data array to byte buffer
-
-      ByteBuffer buffer1 = null;
-      for (int i = 0; i < compressedIndex.length; i++) {
-        buffer1 = ByteBuffer.allocate(nodeHolder.getKeyBlockIndexLength()[i]);
-        buffer1.putInt(compressedIndex[i].length);
-        buffer1.put(compressedIndex[i]);
-        if (compressedIndexMap[i].length > 0) {
-          buffer1.put(compressedIndexMap[i]);
-        }
-        buffer1.rewind();
-        byteBuffer.put(buffer1.array());
-
-      }
-      for (int i = 0; i < compressedDataIndex.length; i++) {
-        byteBuffer.put(compressedDataIndex[i]);
-      }
-      byteBuffer.flip();
-      // write data to file
-      channel.write(byteBuffer);
-    } catch (IOException exception) {
-      throw new CarbonDataWriterException("Problem in writing carbon file: ", exception);
-    }
-    // return the offset, this offset will be used while reading the file in
-    // engine side to get from which position to start reading the file
-    return offset;
-  }
-
-  /**
-   * This method will be used to get the blocklet metadata
-   *
-   * @return BlockletInfo - blocklet metadata
-   */
-  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
-    // create the info object for leaf entry
-    BlockletInfoColumnar info = new BlockletInfoColumnar();
-    //add aggBlocks array
-    info.setAggKeyBlock(nodeHolder.getAggBlocks());
-    // add total entry count
-    info.setNumberOfKeys(nodeHolder.getEntryCount());
-
-    // add the key array length
-    info.setKeyLengths(nodeHolder.getKeyLengths());
-    // adding null measure index bit set
-    info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
-    //add column min max length
-    info.setColumnMaxData(nodeHolder.getColumnMaxData());
-    info.setColumnMinData(nodeHolder.getColumnMinData());
-    long[] keyOffSets = new long[nodeHolder.getKeyLengths().length];
-
-    for (int i = 0; i < keyOffSets.length; i++) {
-      keyOffSets[i] = offset;
-      offset += nodeHolder.getKeyLengths()[i];
-    }
-    // key offset will be 8 bytes from current offset because first 4 bytes
-    // will be for number of entry in leaf, then next 4 bytes will be for
-    // key lenght;
-    //        offset += CarbonCommonConstants.INT_SIZE_IN_BYTE * 2;
-
-    // add key offset
-    info.setKeyOffSets(keyOffSets);
-
-    // add measure length
-    info.setMeasureLength(nodeHolder.getMeasureLenght());
-
-    long[] msrOffset = new long[dataWriterVo.getMeasureCount()];
-
-    for (int i = 0; i < msrOffset.length; i++) {
-      // increment the current offset by 4 bytes because 4 bytes will be
-      // used for measure byte length
-      //            offset += CarbonCommonConstants.INT_SIZE_IN_BYTE;
-      msrOffset[i] = offset;
-      // now increment the offset by adding measure length to get the next
-      // measure offset;
-      offset += nodeHolder.getMeasureLenght()[i];
-    }
-    // add measure offset
-    info.setMeasureOffset(msrOffset);
-    info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
-    info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
-    long[] keyBlockIndexOffsets = new long[nodeHolder.getKeyBlockIndexLength().length];
-    for (int i = 0; i < keyBlockIndexOffsets.length; i++) {
-      keyBlockIndexOffsets[i] = offset;
-      offset += nodeHolder.getKeyBlockIndexLength()[i];
-    }
-    info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
-    long[] dataIndexMapOffsets = new long[nodeHolder.getDataIndexMapLength().length];
-    for (int i = 0; i < dataIndexMapOffsets.length; i++) {
-      dataIndexMapOffsets[i] = offset;
-      offset += nodeHolder.getDataIndexMapLength()[i];
-    }
-    info.setDataIndexMapOffsets(dataIndexMapOffsets);
-    info.setKeyBlockIndexOffSets(keyBlockIndexOffsets);
-    // set startkey
-    info.setStartKey(nodeHolder.getStartKey());
-    // set end key
-    info.setEndKey(nodeHolder.getEndKey());
-    info.setCompressionModel(nodeHolder.getCompressionModel());
-    // return leaf metadata
-
-    //colGroup Blocks
-    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
-
-    return info;
-  }
-
-  /**
-   * This method will write metadata at the end of file file format in thrift format
-   */
-  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
-      String filePath) throws CarbonDataWriterException {
-    try {
-      long currentPosition = channel.size();
-      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
-      FileFooter convertFileMeta = CarbonMetadataUtil
-          .convertFileFooter(infoList, localCardinality.length, localCardinality,
-              thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
-      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
-      writer.writeFooter(convertFileMeta, currentPosition);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
new file mode 100644
index 0000000..19e781d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.store.writer.v1;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.columnar.IndexStorage;
+import org.apache.carbondata.core.datastorage.store.compression.ValueCompressionModel;
+import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
+import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter;
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
+import org.apache.carbondata.processing.store.writer.NodeHolder;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> {
+
+  private static final LogService LOGGER = LogServiceFactory
+      .getLogService(CarbonFactDataWriterImplV1.class.getName());
+
+  public CarbonFactDataWriterImplV1(CarbonDataWriterVo dataWriterVo) {
+    super(dataWriterVo);
+  }
+
+  @Override
+  public NodeHolder buildDataNodeHolder(IndexStorage<int[]>[] keyStorageArray, byte[][] dataArray,
+      int entryCount, byte[] startKey, byte[] endKey, ValueCompressionModel compressionModel,
+      byte[] noDictionaryStartKey, byte[] noDictionaryEndKey) throws CarbonDataWriterException {
+    // if there are no NO-Dictionary column present in the table then
+    // set the empty byte array
+    if (null == noDictionaryEndKey) {
+      noDictionaryEndKey = new byte[0];
+    }
+    if (null == noDictionaryStartKey) {
+      noDictionaryStartKey = new byte[0];
+    }
+    // total measure length;
+    int totalMsrArrySize = 0;
+    // current measure length;
+    int currentMsrLenght = 0;
+    int totalKeySize = 0;
+    int keyBlockSize = 0;
+
+    boolean[] isSortedData = new boolean[keyStorageArray.length];
+    int[] keyLengths = new int[keyStorageArray.length];
+
+    //below will calculate min and max value for each column
+    //for below 2d array, first index will be for column and second will be min max
+    // value for same column
+    //    byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
+
+    byte[][] allMinValue = new byte[keyStorageArray.length][];
+    byte[][] allMaxValue = new byte[keyStorageArray.length][];
+    byte[][] keyBlockData = fillAndCompressedKeyBlockData(keyStorageArray, entryCount);
+    boolean[] colGrpBlock = new boolean[keyStorageArray.length];
+
+    for (int i = 0; i < keyLengths.length; i++) {
+      keyLengths[i] = keyBlockData[i].length;
+      isSortedData[i] = keyStorageArray[i].isAlreadySorted();
+      if (!isSortedData[i]) {
+        keyBlockSize++;
+
+      }
+      totalKeySize += keyLengths[i];
+      if (dataWriterVo.getIsComplexType()[i] || dataWriterVo.getIsDictionaryColumn()[i]) {
+        allMinValue[i] = keyStorageArray[i].getMin();
+        allMaxValue[i] = keyStorageArray[i].getMax();
+      } else {
+        allMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
+        allMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
+      }
+      //if keyStorageArray is instance of ColGroupBlockStorage than it's colGroup chunk
+      if (keyStorageArray[i] instanceof ColGroupBlockStorage) {
+        colGrpBlock[i] = true;
+      }
+    }
+    int[] keyBlockIdxLengths = new int[keyBlockSize];
+    byte[][] dataAfterCompression = new byte[keyBlockSize][];
+    byte[][] indexMap = new byte[keyBlockSize][];
+    int idx = 0;
+    for (int i = 0; i < isSortedData.length; i++) {
+      if (!isSortedData[i]) {
+        dataAfterCompression[idx] =
+            numberCompressor.compress(keyStorageArray[i].getDataAfterComp());
+        if (null != keyStorageArray[i].getIndexMap()
+            && keyStorageArray[i].getIndexMap().length > 0) {
+          indexMap[idx] = numberCompressor.compress(keyStorageArray[i].getIndexMap());
+        } else {
+          indexMap[idx] = new byte[0];
+        }
+        keyBlockIdxLengths[idx] = (dataAfterCompression[idx].length + indexMap[idx].length)
+            + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+        idx++;
+      }
+    }
+    int compressDataBlockSize = 0;
+    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+      if (dataWriterVo.getAggBlocks()[i]) {
+        compressDataBlockSize++;
+      }
+    }
+    byte[][] compressedDataIndex = new byte[compressDataBlockSize][];
+    int[] dataIndexMapLength = new int[compressDataBlockSize];
+    idx = 0;
+    for (int i = 0; i < dataWriterVo.getAggBlocks().length; i++) {
+      if (dataWriterVo.getAggBlocks()[i]) {
+        try {
+          compressedDataIndex[idx] =
+              numberCompressor.compress(keyStorageArray[i].getDataIndexMap());
+          dataIndexMapLength[idx] = compressedDataIndex[idx].length;
+          idx++;
+        } catch (Exception e) {
+          throw new CarbonDataWriterException(e.getMessage());
+        }
+      }
+    }
+
+    int[] msrLength = new int[dataWriterVo.getMeasureCount()];
+    // calculate the total size required for all the measure and get the
+    // each measure size
+    for (int i = 0; i < dataArray.length; i++) {
+      currentMsrLenght = dataArray[i].length;
+      totalMsrArrySize += currentMsrLenght;
+      msrLength[i] = currentMsrLenght;
+    }
+    NodeHolder holder = new NodeHolder();
+    holder.setDataArray(dataArray);
+    holder.setKeyArray(keyBlockData);
+    // end key format will be <length of dictionary key><length of no
+    // dictionary key><DictionaryKey><No Dictionary key>
+    byte[] updatedNoDictionaryEndKey = updateNoDictionaryStartAndEndKey(noDictionaryEndKey);
+    ByteBuffer buffer = ByteBuffer.allocate(
+        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+            + endKey.length + updatedNoDictionaryEndKey.length);
+    buffer.putInt(endKey.length);
+    buffer.putInt(updatedNoDictionaryEndKey.length);
+    buffer.put(endKey);
+    buffer.put(updatedNoDictionaryEndKey);
+    buffer.rewind();
+    holder.setEndKey(buffer.array());
+    holder.setMeasureLenght(msrLength);
+    byte[] updatedNoDictionaryStartKey = updateNoDictionaryStartAndEndKey(noDictionaryStartKey);
+    // start key format will be <length of dictionary key><length of no
+    // dictionary key><DictionaryKey><No Dictionary key>
+    buffer = ByteBuffer.allocate(
+        CarbonCommonConstants.INT_SIZE_IN_BYTE + CarbonCommonConstants.INT_SIZE_IN_BYTE
+            + startKey.length + updatedNoDictionaryStartKey.length);
+    buffer.putInt(startKey.length);
+    buffer.putInt(updatedNoDictionaryStartKey.length);
+    buffer.put(startKey);
+    buffer.put(updatedNoDictionaryStartKey);
+    buffer.rewind();
+    holder.setStartKey(buffer.array());
+    holder.setEntryCount(entryCount);
+    holder.setKeyLengths(keyLengths);
+    holder.setKeyBlockIndexLength(keyBlockIdxLengths);
+    holder.setIsSortedKeyBlock(isSortedData);
+    holder.setCompressedIndex(dataAfterCompression);
+    holder.setCompressedIndexMap(indexMap);
+    holder.setDataIndexMapLength(dataIndexMapLength);
+    holder.setCompressedDataIndex(compressedDataIndex);
+    holder.setCompressionModel(compressionModel);
+    holder.setTotalDimensionArrayLength(totalKeySize);
+    holder.setTotalMeasureArrayLength(totalMsrArrySize);
+    //setting column min max value
+    holder.setColumnMaxData(allMaxValue);
+    holder.setColumnMinData(allMinValue);
+    holder.setAggBlocks(dataWriterVo.getAggBlocks());
+    holder.setColGrpBlocks(colGrpBlock);
+    return holder;
+  }
+
+  @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
+    int indexBlockSize = 0;
+    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+      indexBlockSize += holder.getKeyBlockIndexLength()[i] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    }
+
+    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+      indexBlockSize += holder.getDataIndexMapLength()[i];
+    }
+
+    long blockletDataSize =
+        holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength()
+            + indexBlockSize;
+    updateBlockletFileChannel(blockletDataSize);
+    // write data to file and get its offset
+    long offset = writeDataToFile(holder, fileChannel);
+    // get the blocklet info for currently added blocklet
+    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, offset);
+    // add blocklet info to list
+    blockletInfoList.add(blockletInfo);
+    LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
+  }
+
+  /**
+   * This method is responsible for writing blocklet to the data file
+   *
+   * @return file offset offset is the current position of the file
+   * @throws CarbonDataWriterException if will throw CarbonDataWriterException when any thing
+   *                                   goes wrong while while writing the leaf file
+   */
+  private long writeDataToFile(NodeHolder nodeHolder, FileChannel channel)
+      throws CarbonDataWriterException {
+    // create byte buffer
+    byte[][] compressedIndex = nodeHolder.getCompressedIndex();
+    byte[][] compressedIndexMap = nodeHolder.getCompressedIndexMap();
+    byte[][] compressedDataIndex = nodeHolder.getCompressedDataIndex();
+    int indexBlockSize = 0;
+    int index = 0;
+    for (int i = 0; i < nodeHolder.getKeyBlockIndexLength().length; i++) {
+      indexBlockSize +=
+          nodeHolder.getKeyBlockIndexLength()[index++] + CarbonCommonConstants.INT_SIZE_IN_BYTE;
+    }
+
+    for (int i = 0; i < nodeHolder.getDataIndexMapLength().length; i++) {
+      indexBlockSize += nodeHolder.getDataIndexMapLength()[i];
+    }
+    ByteBuffer byteBuffer = ByteBuffer.allocate(
+        nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength()
+            + indexBlockSize);
+    long offset = 0;
+    try {
+      // get the current offset
+      offset = channel.size();
+      // add key array to byte buffer
+      for (int i = 0; i < nodeHolder.getKeyArray().length; i++) {
+        byteBuffer.put(nodeHolder.getKeyArray()[i]);
+      }
+      for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+        byteBuffer.put(nodeHolder.getDataArray()[i]);
+      }
+      // add measure data array to byte buffer
+
+      ByteBuffer buffer1 = null;
+      for (int i = 0; i < compressedIndex.length; i++) {
+        buffer1 = ByteBuffer.allocate(nodeHolder.getKeyBlockIndexLength()[i]);
+        buffer1.putInt(compressedIndex[i].length);
+        buffer1.put(compressedIndex[i]);
+        if (compressedIndexMap[i].length > 0) {
+          buffer1.put(compressedIndexMap[i]);
+        }
+        buffer1.rewind();
+        byteBuffer.put(buffer1.array());
+
+      }
+      for (int i = 0; i < compressedDataIndex.length; i++) {
+        byteBuffer.put(compressedDataIndex[i]);
+      }
+      byteBuffer.flip();
+      // write data to file
+      channel.write(byteBuffer);
+    } catch (IOException exception) {
+      throw new CarbonDataWriterException("Problem in writing carbon file: ", exception);
+    }
+    // return the offset, this offset will be used while reading the file in
+    // engine side to get from which position to start reading the file
+    return offset;
+  }
+
+  /**
+   * This method will be used to get the blocklet metadata
+   *
+   * @return BlockletInfo - blocklet metadata
+   */
+  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
+    // create the info object for leaf entry
+    BlockletInfoColumnar info = new BlockletInfoColumnar();
+    //add aggBlocks array
+    info.setAggKeyBlock(nodeHolder.getAggBlocks());
+    // add total entry count
+    info.setNumberOfKeys(nodeHolder.getEntryCount());
+
+    // add the key array length
+    info.setKeyLengths(nodeHolder.getKeyLengths());
+    // adding null measure index bit set
+    info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
+    //add column min max length
+    info.setColumnMaxData(nodeHolder.getColumnMaxData());
+    info.setColumnMinData(nodeHolder.getColumnMinData());
+    long[] keyOffSets = new long[nodeHolder.getKeyLengths().length];
+
+    for (int i = 0; i < keyOffSets.length; i++) {
+      keyOffSets[i] = offset;
+      offset += nodeHolder.getKeyLengths()[i];
+    }
+    // key offset will be 8 bytes from current offset because first 4 bytes
+    // will be for number of entry in leaf, then next 4 bytes will be for
+    // key lenght;
+    //        offset += CarbonCommonConstants.INT_SIZE_IN_BYTE * 2;
+
+    // add key offset
+    info.setKeyOffSets(keyOffSets);
+
+    // add measure length
+    info.setMeasureLength(nodeHolder.getMeasureLenght());
+
+    long[] msrOffset = new long[dataWriterVo.getMeasureCount()];
+
+    for (int i = 0; i < msrOffset.length; i++) {
+      // increment the current offset by 4 bytes because 4 bytes will be
+      // used for measure byte length
+      //            offset += CarbonCommonConstants.INT_SIZE_IN_BYTE;
+      msrOffset[i] = offset;
+      // now increment the offset by adding measure length to get the next
+      // measure offset;
+      offset += nodeHolder.getMeasureLenght()[i];
+    }
+    // add measure offset
+    info.setMeasureOffset(msrOffset);
+    info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
+    info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
+    long[] keyBlockIndexOffsets = new long[nodeHolder.getKeyBlockIndexLength().length];
+    for (int i = 0; i < keyBlockIndexOffsets.length; i++) {
+      keyBlockIndexOffsets[i] = offset;
+      offset += nodeHolder.getKeyBlockIndexLength()[i];
+    }
+    info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
+    long[] dataIndexMapOffsets = new long[nodeHolder.getDataIndexMapLength().length];
+    for (int i = 0; i < dataIndexMapOffsets.length; i++) {
+      dataIndexMapOffsets[i] = offset;
+      offset += nodeHolder.getDataIndexMapLength()[i];
+    }
+    info.setDataIndexMapOffsets(dataIndexMapOffsets);
+    info.setKeyBlockIndexOffSets(keyBlockIndexOffsets);
+    // set startkey
+    info.setStartKey(nodeHolder.getStartKey());
+    // set end key
+    info.setEndKey(nodeHolder.getEndKey());
+    info.setCompressionModel(nodeHolder.getCompressionModel());
+    // return leaf metadata
+
+    //colGroup Blocks
+    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
+
+    return info;
+  }
+
+  /**
+   * This method will write metadata at the end of file file format in thrift format
+   */
+  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+      String filePath) throws CarbonDataWriterException {
+    try {
+      long currentPosition = channel.size();
+      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+      FileFooter convertFileMeta = CarbonMetadataUtil
+          .convertFileFooter(infoList, localCardinality.length, localCardinality,
+              thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+      writer.writeFooter(convertFileMeta, currentPosition);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
new file mode 100644
index 0000000..f9f45cc
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.store.writer.v2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.util.CarbonMetadataUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.writer.CarbonFooterWriter;
+import org.apache.carbondata.format.DataChunk2;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
+import org.apache.carbondata.processing.store.writer.NodeHolder;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
+
+/**
+ * Below method will be used to write the data in version 2 format
+ */
+public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 {
+
+  /**
+   * logger
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonFactDataWriterImplV2.class.getName());
+
+  /**
+   * Constructor create instance of this class
+   *
+   * @param dataWriterVo
+   */
+  public CarbonFactDataWriterImplV2(CarbonDataWriterVo dataWriterVo) {
+    super(dataWriterVo);
+  }
+
+  /**
+   * Below method will be used to write the data to carbon data file
+   *
+   * @param holder
+   * @throws CarbonDataWriterException any problem in writing operation
+   */
+  @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
+    // size to calculate the size of the blocklet
+    int size = 0;
+    // get the blocklet info object
+    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0);
+
+    List<DataChunk2> datachunks = null;
+    try {
+      // get all the data chunks
+      datachunks = CarbonMetadataUtil
+          .getDatachunk2(blockletInfo, thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the data chunks", e);
+    }
+    // data chunk byte array
+    byte[][] dataChunkByteArray = new byte[datachunks.size()][];
+    for (int i = 0; i < dataChunkByteArray.length; i++) {
+      dataChunkByteArray[i] = CarbonUtil.getByteArray(datachunks.get(i));
+      // add the data chunk size
+      size += dataChunkByteArray[i].length;
+    }
+    // add row id index length
+    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
+      size += holder.getKeyBlockIndexLength()[i];
+    }
+    // add rle index length
+    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
+      size += holder.getDataIndexMapLength()[i];
+    }
+    // add dimension column data page and measure column data page size
+    long blockletDataSize =
+        holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size;
+    // if size of the file already reached threshold size then create a new file and get the file
+    // channel object
+    updateBlockletFileChannel(blockletDataSize);
+    // writer the version header in the file if current file size is zero
+    // this is done so carbondata file can be read separately
+    try {
+      if (fileChannel.size() == 0) {
+        ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+        byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + version).getBytes();
+        ByteBuffer buffer = ByteBuffer.allocate(header.length);
+        buffer.put(header);
+        buffer.rewind();
+        fileChannel.write(buffer);
+      }
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the file channel size", e);
+    }
+    // write data to file and get its offset
+    writeDataToFile(holder, dataChunkByteArray, fileChannel);
+    // add blocklet info to list
+    blockletInfoList.add(blockletInfo);
+    LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
+  }
+
+  /**
+   * Below method will be used to write the data to file
+   * Data Format
+   * <DColumn1DataChunk><DColumnDataPage><DColumnRle>
+   * <DColumn2DataChunk><DColumn2DataPage><DColumn2RowIds><DColumn2Rle>
+   * <DColumn3DataChunk><DColumn3DataPage><column3RowIds>
+   * <MColumn1DataChunk><MColumn1DataPage>
+   * <MColumn2DataChunk><MColumn2DataPage>
+   * <MColumn2DataChunk><MColumn2DataPage>
+   *
+   * @param nodeHolder
+   * @param dataChunksBytes
+   * @param channel
+   * @throws CarbonDataWriterException
+   */
+  private void writeDataToFile(NodeHolder nodeHolder, byte[][] dataChunksBytes, FileChannel channel)
+      throws CarbonDataWriterException {
+    long offset = 0;
+    try {
+      offset = channel.size();
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while getting the file channel size");
+    }
+    List<Long> currentDataChunksOffset = new ArrayList<>();
+    List<Short> currentDataChunksLength = new ArrayList<>();
+    dataChunksLength.add(currentDataChunksLength);
+    dataChunksOffsets.add(currentDataChunksOffset);
+    int bufferSize = 0;
+    int rowIdIndex = 0;
+    int rleIndex = 0;
+    for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
+      currentDataChunksOffset.add(offset);
+      currentDataChunksLength.add((short) dataChunksBytes[i].length);
+      bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] + (!nodeHolder
+          .getIsSortedKeyBlock()[i] ? nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + (
+          dataWriterVo.getAggBlocks()[i] ?
+              nodeHolder.getCompressedDataIndex()[rleIndex].length :
+              0);
+      offset += dataChunksBytes[i].length;
+      offset += nodeHolder.getKeyLengths()[i];
+      if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+        offset += nodeHolder.getKeyBlockIndexLength()[rowIdIndex];
+        rowIdIndex++;
+      }
+      if (dataWriterVo.getAggBlocks()[i]) {
+        offset += nodeHolder.getDataIndexMapLength()[rleIndex];
+        rleIndex++;
+      }
+    }
+    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+    rleIndex = 0;
+    rowIdIndex = 0;
+    for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
+      buffer.put(dataChunksBytes[i]);
+      buffer.put(nodeHolder.getKeyArray()[i]);
+      if (!nodeHolder.getIsSortedKeyBlock()[i]) {
+        buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length);
+        buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]);
+        if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) {
+          buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]);
+        }
+        rowIdIndex++;
+      }
+      if (dataWriterVo.getAggBlocks()[i]) {
+        buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]);
+        rleIndex++;
+      }
+    }
+    try {
+      buffer.flip();
+      channel.write(buffer);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException(
+          "Problem while writing the dimension data in carbon data file", e);
+    }
+
+    int dataChunkIndex = nodeHolder.getKeyArray().length;
+    int totalLength = 0;
+    for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+      currentDataChunksOffset.add(offset);
+      currentDataChunksLength.add((short) dataChunksBytes[dataChunkIndex].length);
+      offset += dataChunksBytes[dataChunkIndex].length;
+      offset += nodeHolder.getDataArray()[i].length;
+      totalLength += dataChunksBytes[dataChunkIndex].length;
+      totalLength += nodeHolder.getDataArray()[i].length;
+      dataChunkIndex++;
+    }
+    buffer = ByteBuffer.allocate(totalLength);
+    dataChunkIndex = nodeHolder.getKeyArray().length;
+    for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
+      buffer.put(dataChunksBytes[dataChunkIndex++]);
+      buffer.put(nodeHolder.getDataArray()[i]);
+    }
+    try {
+      buffer.flip();
+      channel.write(buffer);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException(
+          "Problem while writing the measure data in carbon data file", e);
+    }
+  }
+
+  /**
+   * This method will be used to get the blocklet metadata
+   *
+   * @return BlockletInfo - blocklet metadata
+   */
+  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
+    // create the info object for leaf entry
+    BlockletInfoColumnar info = new BlockletInfoColumnar();
+    //add aggBlocks array
+    info.setAggKeyBlock(nodeHolder.getAggBlocks());
+    // add total entry count
+    info.setNumberOfKeys(nodeHolder.getEntryCount());
+
+    // add the key array length
+    info.setKeyLengths(nodeHolder.getKeyLengths());
+    // adding null measure index bit set
+    info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
+    //add column min max length
+    info.setColumnMaxData(nodeHolder.getColumnMaxData());
+    info.setColumnMinData(nodeHolder.getColumnMinData());
+
+    // add measure length
+    info.setMeasureLength(nodeHolder.getMeasureLenght());
+
+    info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
+    info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
+    info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
+    // set startkey
+    info.setStartKey(nodeHolder.getStartKey());
+    // set end key
+    info.setEndKey(nodeHolder.getEndKey());
+    info.setCompressionModel(nodeHolder.getCompressionModel());
+    // return leaf metadata
+
+    //colGroup Blocks
+    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
+
+    return info;
+  }
+
+  /**
+   * This method will write metadata at the end of file file format in thrift format
+   */
+  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
+      String filePath) throws CarbonDataWriterException {
+    try {
+      // get the current file position
+      long currentPosition = channel.size();
+      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
+      // get thrift file footer instance
+      FileFooter convertFileMeta = CarbonMetadataUtil
+          .convertFilterFooter2(infoList, localCardinality, thriftColumnSchemaList,
+              dataChunksOffsets, dataChunksLength);
+      // fill the carbon index details
+      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
+      // write the footer
+      writer.writeFooter(convertFileMeta, currentPosition);
+    } catch (IOException e) {
+      throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 84192b8..328e33b 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.BlockIndexStore;
 import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
@@ -52,7 +53,7 @@ public class BlockIndexStoreTest extends TestCase {
   @BeforeClass public void setUp() {
 	property = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
 	
-	CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");  
+	CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V1");
     StoreCreator.createCarbonStore();
     indexStore = BlockIndexStore.getInstance();
   }
@@ -61,7 +62,7 @@ public class BlockIndexStoreTest extends TestCase {
 	    if(null!=property) {
 		CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, property);
 	    }else {
-	    	CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION+"");
+	    	CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
 	    }
 	  }
 
@@ -71,7 +72,7 @@ public class BlockIndexStoreTest extends TestCase {
     File file = getPartFile();
     TableBlockInfo info =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length(),(short)1);
+            file.length(), ColumnarFormatVersion.V1);
     CarbonTableIdentifier carbonTableIdentifier =
             new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
     AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -93,20 +94,20 @@ public class BlockIndexStoreTest extends TestCase {
     File file = getPartFile();
     TableBlockInfo info =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info1 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     TableBlockInfo info2 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info3 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info4 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     CarbonTableIdentifier carbonTableIdentifier =
             new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");
@@ -148,31 +149,31 @@ public class BlockIndexStoreTest extends TestCase {
     File file = getPartFile();
     TableBlockInfo info =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info1 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     TableBlockInfo info2 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info3 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
     TableBlockInfo info4 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     TableBlockInfo info5 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
-            file.length(),(short)1);
+            file.length(),ColumnarFormatVersion.V1);
     TableBlockInfo info6 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     TableBlockInfo info7 =
         new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { "loclhost" },
-            file.length(), (short)1);
+            file.length(), ColumnarFormatVersion.V1);
 
     CarbonTableIdentifier carbonTableIdentifier =
             new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1");


[3/3] incubator-carbondata git commit: [CARBONDATA-480] Add file format version enum. This closes #378

Posted by ra...@apache.org.
[CARBONDATA-480] Add file format version enum. This closes #378


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/5406cee1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/5406cee1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/5406cee1

Branch: refs/heads/master
Commit: 5406cee1bb8c46dd903865ca785b8a039f753256
Parents: 90bc366 0ef3fb8
Author: ravipesala <ra...@gmail.com>
Authored: Thu Dec 1 22:05:11 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 1 22:05:11 2016 +0530

----------------------------------------------------------------------
 conf/dataload.properties.template               |   4 +-
 .../core/carbon/ColumnarFormatVersion.java      |  50 +++
 .../carbon/datastore/block/TableBlockInfo.java  |  11 +-
 .../chunk/reader/CarbonDataReaderFactory.java   |  17 +-
 .../metadata/blocklet/DataFileFooter.java       |   7 +-
 .../core/constants/CarbonCommonConstants.java   |   2 +-
 .../util/AbstractDataFileFooterConverter.java   |   4 +-
 .../core/util/CarbonMetadataUtil.java           |  11 +-
 .../carbondata/core/util/CarbonProperties.java  |  49 ++-
 .../apache/carbondata/core/util/CarbonUtil.java |   5 -
 .../core/util/DataFileFooterConverter.java      |   3 +-
 .../core/util/DataFileFooterConverter2.java     |   3 +-
 .../util/DataFileFooterConverterFactory.java    |  14 +-
 .../carbon/datastore/block/BlockInfoTest.java   |  13 +-
 .../datastore/block/TableBlockInfoTest.java     |  33 +-
 .../datastore/block/TableTaskInfoTest.java      |   9 +-
 .../core/util/CarbonMetadataUtilTest.java       |   2 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   9 +-
 .../core/util/DataFileFooterConverterTest.java  |   5 +-
 .../carbondata/examples/CarbonExample.scala     |   2 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |   6 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  21 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   7 +-
 .../TestQueryWithOldCarbonDataFile.scala        |   8 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   7 +-
 .../store/CarbonDataWriterFactory.java          |  16 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   4 +-
 .../store/writer/CarbonFactDataWriterImpl2.java | 285 --------------
 ...actDataWriterImplForIntIndexAndAggBlock.java | 379 ------------------
 .../writer/v1/CarbonFactDataWriterImplV1.java   | 382 +++++++++++++++++++
 .../writer/v2/CarbonFactDataWriterImplV2.java   | 288 ++++++++++++++
 .../carbon/datastore/BlockIndexStoreTest.java   |  33 +-
 32 files changed, 891 insertions(+), 798 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-carbondata git commit: add file format version enum

Posted by ra...@apache.org.
add file format version enum


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/0ef3fb81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/0ef3fb81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/0ef3fb81

Branch: refs/heads/master
Commit: 0ef3fb81883e782fffef0beae01a429684893960
Parents: 90bc366
Author: jackylk <ja...@huawei.com>
Authored: Thu Dec 1 23:02:16 2016 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 1 22:04:26 2016 +0530

----------------------------------------------------------------------
 conf/dataload.properties.template               |   4 +-
 .../core/carbon/ColumnarFormatVersion.java      |  50 +++
 .../carbon/datastore/block/TableBlockInfo.java  |  11 +-
 .../chunk/reader/CarbonDataReaderFactory.java   |  17 +-
 .../metadata/blocklet/DataFileFooter.java       |   7 +-
 .../core/constants/CarbonCommonConstants.java   |   2 +-
 .../util/AbstractDataFileFooterConverter.java   |   4 +-
 .../core/util/CarbonMetadataUtil.java           |  11 +-
 .../carbondata/core/util/CarbonProperties.java  |  49 ++-
 .../apache/carbondata/core/util/CarbonUtil.java |   5 -
 .../core/util/DataFileFooterConverter.java      |   3 +-
 .../core/util/DataFileFooterConverter2.java     |   3 +-
 .../util/DataFileFooterConverterFactory.java    |  14 +-
 .../carbon/datastore/block/BlockInfoTest.java   |  13 +-
 .../datastore/block/TableBlockInfoTest.java     |  33 +-
 .../datastore/block/TableTaskInfoTest.java      |   9 +-
 .../core/util/CarbonMetadataUtilTest.java       |   2 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   9 +-
 .../core/util/DataFileFooterConverterTest.java  |   5 +-
 .../carbondata/examples/CarbonExample.scala     |   2 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |   6 +-
 .../carbondata/hadoop/CarbonInputSplit.java     |  21 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   7 +-
 .../TestQueryWithOldCarbonDataFile.scala        |   8 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   7 +-
 .../store/CarbonDataWriterFactory.java          |  16 +-
 .../store/CarbonFactDataHandlerColumnar.java    |   4 +-
 .../store/writer/CarbonFactDataWriterImpl2.java | 285 --------------
 ...actDataWriterImplForIntIndexAndAggBlock.java | 379 ------------------
 .../writer/v1/CarbonFactDataWriterImplV1.java   | 382 +++++++++++++++++++
 .../writer/v2/CarbonFactDataWriterImplV2.java   | 288 ++++++++++++++
 .../carbon/datastore/BlockIndexStoreTest.java   |  33 +-
 32 files changed, 891 insertions(+), 798 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/conf/dataload.properties.template
----------------------------------------------------------------------
diff --git a/conf/dataload.properties.template b/conf/dataload.properties.template
index 59cad4a..d5e9d6a 100644
--- a/conf/dataload.properties.template
+++ b/conf/dataload.properties.template
@@ -18,14 +18,14 @@
 
 #carbon store path
 # you should change to the code path of your local machine
-carbon.storelocation=/Users/wangfei/code/incubator-carbondata/examples/spark2/target/store
+carbon.storelocation=/Users/jackylk/code/incubator-carbondata/examples/spark2/target/store
 
 #true: use kettle to load data
 #false: use new flow to load data
 use_kettle=true
 
 # you should change to the code path of your local machine
-carbon.kettle.home=/Users/wangfei/code/incubator-carbondata/processing/carbonplugins
+carbon.kettle.home=/Users/jackylk/code/incubator-carbondata/processing/carbonplugins
 
 #csv delimiter character
 delimiter=,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java b/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java
new file mode 100644
index 0000000..bef345c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/ColumnarFormatVersion.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.core.carbon;
+
+public enum ColumnarFormatVersion {
+  V1((short)1),
+  V2((short)2);
+
+  private short version;
+  ColumnarFormatVersion(short version) {
+    this.version = version;
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnarFormatV" + version;
+  }
+
+  public short number() {
+    return version;
+  }
+
+  public static ColumnarFormatVersion valueOf(short version) {
+    switch (version) {
+      case 1:
+        return V1;
+      case 2:
+        return V2;
+      default:
+        throw new IllegalArgumentException("invalid format version: " + version);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
index 0d60567..802a116 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.core.carbon.datastore.block;
 
 import java.io.Serializable;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath.DataFileUtil;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
@@ -57,14 +58,14 @@ public class TableBlockInfo implements Distributable, Serializable {
 
   private String[] locations;
 
-  private short version;
+  private ColumnarFormatVersion version;
   /**
    * The class holds the blockletsinfo
    */
   private BlockletInfos blockletInfos = new BlockletInfos();
 
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
-      long blockLength, short version) {
+      long blockLength, ColumnarFormatVersion version) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
     this.blockOffset = blockOffset;
     this.segmentId = segmentId;
@@ -84,7 +85,7 @@ public class TableBlockInfo implements Distributable, Serializable {
    * @param blockletInfos
    */
   public TableBlockInfo(String filePath, long blockOffset, String segmentId, String[] locations,
-      long blockLength, BlockletInfos blockletInfos, short version) {
+      long blockLength, BlockletInfos blockletInfos, ColumnarFormatVersion version) {
     this.filePath = FileFactory.getUpdatedFilePath(filePath);
     this.blockOffset = blockOffset;
     this.segmentId = segmentId;
@@ -259,11 +260,11 @@ public class TableBlockInfo implements Distributable, Serializable {
     this.blockletInfos = blockletInfos;
   }
 
-  public short getVersion() {
+  public ColumnarFormatVersion getVersion() {
     return version;
   }
 
-  public void setVersion(short version) {
+  public void setVersion(ColumnarFormatVersion version) {
     this.version = version;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
index 08a1869..9bf7e62 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/chunk/reader/CarbonDataReaderFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.core.carbon.datastore.chunk.reader;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v1.CompressedDimensionChunkFileBasedReaderV1;
 import org.apache.carbondata.core.carbon.datastore.chunk.reader.dimension.v2.CompressedDimensionChunkFileBasedReaderV2;
 import org.apache.carbondata.core.carbon.datastore.chunk.reader.measure.v1.CompressedMeasureChunkFileBasedReaderV1;
@@ -60,15 +61,17 @@ public class CarbonDataReaderFactory {
    * @param filePath            carbon data file path
    * @return dimension column data reader based on version number
    */
-  public DimensionColumnChunkReader getDimensionColumnChunkReader(short version,
+  public DimensionColumnChunkReader getDimensionColumnChunkReader(ColumnarFormatVersion version,
       BlockletInfo blockletInfo, int[] eachColumnValueSize, String filePath) {
     switch (version) {
-      case 2:
+      case V2:
         return new CompressedDimensionChunkFileBasedReaderV2(blockletInfo, eachColumnValueSize,
             filePath);
-      default:
+      case V1:
         return new CompressedDimensionChunkFileBasedReaderV1(blockletInfo, eachColumnValueSize,
             filePath);
+      default:
+        throw new IllegalArgumentException("invalid format version: " + version);
     }
   }
 
@@ -80,13 +83,15 @@ public class CarbonDataReaderFactory {
    * @param filePath     carbon data file path
    * @return measure column data reader based on version number
    */
-  public MeasureColumnChunkReader getMeasureColumnChunkReader(short version,
+  public MeasureColumnChunkReader getMeasureColumnChunkReader(ColumnarFormatVersion version,
       BlockletInfo blockletInfo, String filePath) {
     switch (version) {
-      case 2:
+      case V2:
         return new CompressedMeasureChunkFileBasedReaderV2(blockletInfo, filePath);
-      default:
+      case V1:
         return new CompressedMeasureChunkFileBasedReaderV1(blockletInfo, filePath);
+      default:
+        throw new IllegalArgumentException("invalid format version: " + version);
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
index be235ba..a82bac9 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/blocklet/DataFileFooter.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.core.carbon.metadata.blocklet;
 import java.io.Serializable;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.index.BlockletIndex;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
@@ -38,7 +39,7 @@ public class DataFileFooter implements Serializable {
   /**
    * version used for data compatibility
    */
-  private short versionId;
+  private ColumnarFormatVersion versionId;
 
   /**
    * total number of rows in this file
@@ -73,14 +74,14 @@ public class DataFileFooter implements Serializable {
   /**
    * @return the versionId
    */
-  public short getVersionId() {
+  public ColumnarFormatVersion getVersionId() {
     return versionId;
   }
 
   /**
    * @param versionId the versionId to set
    */
-  public void setVersionId(short versionId) {
+  public void setVersionId(ColumnarFormatVersion versionId) {
     this.versionId = versionId;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 443c8c4..1ac2ba1 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -900,7 +900,7 @@ public final class CarbonCommonConstants {
   /**
    * current data file version
    */
-  public static final short CARBON_DATA_FILE_DEFAULT_VERSION = 2;
+  public static final String CARBON_DATA_FILE_DEFAULT_VERSION = "V2";
   /**
    * number of column data will read in IO operation
    * during query execution

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index db9c9be..7f50c34 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
@@ -99,7 +100,8 @@ public abstract class AbstractDataFileFooterConverter {
         dataFileFooter = new DataFileFooter();
         TableBlockInfo tableBlockInfo = tableBlockInfoList.get(counter++);
         tableBlockInfo.setBlockOffset(readBlockIndexInfo.getOffset());
-        tableBlockInfo.setVersion((short) readIndexHeader.getVersion());
+        tableBlockInfo.setVersion(
+            ColumnarFormatVersion.valueOf((short) readIndexHeader.getVersion()));
         int blockletSize = getBlockletSize(readBlockIndexInfo);
         tableBlockInfo.getBlockletInfos().setNoOfBlockLets(blockletSize);
         dataFileFooter.setBlockletIndex(blockletIndex);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 4f8a435..bdd6fae 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -30,6 +30,7 @@ import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.carbon.metadata.index.BlockIndexInfo;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -97,10 +98,9 @@ public class CarbonMetadataUtil {
     SegmentInfo segmentInfo = new SegmentInfo();
     segmentInfo.setNum_cols(columnSchemaList.size());
     segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(cardinalities));
-    short version = Short.parseShort(
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
+    ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
     FileFooter footer = new FileFooter();
-    footer.setVersion(version);
+    footer.setVersion(version.number());
     footer.setNum_rows(getTotalNumberOfRows(infoList));
     footer.setSegment_info(segmentInfo);
     footer.setTable_columns(columnSchemaList);
@@ -476,9 +476,8 @@ public class CarbonMetadataUtil {
     segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
     // create index header object
     IndexHeader indexHeader = new IndexHeader();
-    short version = Short.parseShort(
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
-    indexHeader.setVersion(version);
+    ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+    indexHeader.setVersion(version.number());
     // set the segment info
     indexHeader.setSegment_info(segmentInfo);
     // set the column names

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index adb0e6a..f4ec63d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -27,6 +27,7 @@ import java.util.Properties;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 
 public final class CarbonProperties {
@@ -263,27 +264,25 @@ public final class CarbonProperties {
    * if parameter is invalid current version will be set
    */
   private void validateCarbonDataFileVersion() {
-    short carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
     String carbondataFileVersionString =
         carbonProperties.getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
-    try {
-      carbondataFileVersion = Short.parseShort(carbondataFileVersionString);
-    } catch (NumberFormatException e) {
-      carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
-      LOGGER.info("Current Data file version property is invalid  \"" + carbondataFileVersionString
-          + "\" is invalid. Using the Current data file version value \"" + carbondataFileVersion);
-      carbonProperties
-          .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, carbondataFileVersion + "");
-    }
-    if (carbondataFileVersion > CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION
-        || carbondataFileVersion < 0) {
-      LOGGER.info("Current Data file version property is invalid  \"" + carbondataFileVersionString
-          + "\" is invalid. Using the Current data file version value \"" + carbondataFileVersion);
-      carbondataFileVersion = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
+    if (carbondataFileVersionString == null) {
+      // use default property if user does not specify version property
       carbonProperties
-          .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, carbondataFileVersion + "");
+          .setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+              CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+    } else {
+      try {
+        ColumnarFormatVersion.valueOf(carbondataFileVersionString);
+      } catch (IllegalArgumentException e) {
+        // use default property if user specifies an invalid version property
+        LOGGER.warn("Specified file version property is invalid: " +
+            carbondataFileVersionString + ". Using " +
+            CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION + " as default file version");
+        carbonProperties.setProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+            CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+      }
     }
-
   }
 
   /**
@@ -362,7 +361,23 @@ public final class CarbonProperties {
    */
   public void addProperty(String key, String value) {
     carbonProperties.setProperty(key, value);
+  }
+
+  private ColumnarFormatVersion getDefaultFormatVersion() {
+    return ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+  }
 
+  public ColumnarFormatVersion getFormatVersion() {
+    String versionStr = getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION);
+    if (versionStr == null) {
+      return getDefaultFormatVersion();
+    } else {
+      try {
+        return ColumnarFormatVersion.valueOf(versionStr);
+      } catch (IllegalArgumentException e) {
+        return getDefaultFormatVersion();
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 162e9b9..41594ad 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1056,11 +1056,6 @@ public final class CarbonUtil {
 
   /**
    * Below method will be used to read the data file matadata
-   *
-   * @param filePath file path
-   * @param blockOffset   offset in the file
-   * @return Data file metadata instance
-   * @throws CarbonUtilException
    */
   public static DataFileFooter readMetadatFile(TableBlockInfo tableBlockInfo)
       throws CarbonUtilException {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
index ea1324e..e766e85 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
@@ -55,7 +56,7 @@ public class DataFileFooterConverter extends AbstractDataFileFooterConverter {
       CarbonFooterReader reader =
           new CarbonFooterReader(tableBlockInfo.getFilePath(), actualFooterOffset);
       FileFooter footer = reader.readFooter();
-      dataFileFooter.setVersionId((short) footer.getVersion());
+      dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) footer.getVersion()));
       dataFileFooter.setNumberOfRows(footer.getNum_rows());
       dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
       List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
index d971756..02de383 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverter2.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter;
@@ -46,7 +47,7 @@ public class DataFileFooterConverter2 extends AbstractDataFileFooterConverter {
     CarbonFooterReader reader =
         new CarbonFooterReader(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset());
     FileFooter footer = reader.readFooter();
-    dataFileFooter.setVersionId((short) footer.getVersion());
+    dataFileFooter.setVersionId(ColumnarFormatVersion.valueOf((short) footer.getVersion()));
     dataFileFooter.setNumberOfRows(footer.getNum_rows());
     dataFileFooter.setSegmentInfo(getSegmentInfo(footer.getSegment_info()));
     List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
index a079ad7..175a22b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataFileFooterConverterFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.core.util;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 
 /**
  * Factory class to get the thrift reader object based on version
@@ -49,15 +50,18 @@ public class DataFileFooterConverterFactory {
   /**
    * Method will be used to get the file footer converter instance based on version
    *
-   * @param versionNumber
+   * @param version
    * @return footer reader instance
    */
-  public AbstractDataFileFooterConverter getDataFileFooterConverter(final short versionNumber) {
-    switch (versionNumber) {
-      case 2:
+  public AbstractDataFileFooterConverter getDataFileFooterConverter(
+      final ColumnarFormatVersion version) {
+    switch (version) {
+      case V2:
         return new DataFileFooterConverter2();
-      default:
+      case V1:
         return new DataFileFooterConverter();
+      default:
+        throw new IllegalArgumentException("invalid format version: " + version);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
index eabf688..6d90a36 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/BlockInfoTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.core.carbon.datastore.block;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -28,7 +29,7 @@ public class BlockInfoTest {
   static BlockInfo blockInfo;
 
   @BeforeClass public static void setup() {
-    blockInfo = new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6,(short)1));
+    blockInfo = new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1));
   }
 
   @Test public void hashCodeTest() {
@@ -44,7 +45,7 @@ public class BlockInfoTest {
 
   @Test public void equalsTestWithSimilarObject() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, (short)1));
+        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (res);
   }
@@ -61,28 +62,28 @@ public class BlockInfoTest {
 
   @Test public void equalsTestWithDifferentSegmentId() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 6, "diffSegmentId", null, 6,(short)1));
+        new BlockInfo(new TableBlockInfo("filePath", 6, "diffSegmentId", null, 6, ColumnarFormatVersion.V1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDifferentOffset() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 62, "segmentId", null, 6, (short)1));
+        new BlockInfo(new TableBlockInfo("filePath", 62, "segmentId", null, 6, ColumnarFormatVersion.V1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDifferentBlockLength() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 62, (short)1));
+        new BlockInfo(new TableBlockInfo("filePath", 6, "segmentId", null, 62, ColumnarFormatVersion.V1));
     Boolean res = blockInfo.equals(blockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffFilePath() {
     BlockInfo blockInfoTest =
-        new BlockInfo(new TableBlockInfo("diffFilePath", 6, "segmentId", null, 62, (short)1));
+        new BlockInfo(new TableBlockInfo("diffFilePath", 6, "segmentId", null, 62, ColumnarFormatVersion.V1));
     Boolean res = blockInfoTest.equals(blockInfo);
     assert (!res);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
index 1b49f83..b6669ed 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfoTest.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.core.carbon.datastore.block;
 import mockit.Mock;
 import mockit.MockUp;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
 
 import org.junit.BeforeClass;
@@ -34,8 +35,8 @@ public class TableBlockInfoTest {
   static TableBlockInfo tableBlockInfos;
 
   @BeforeClass public static void setup() {
-    tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, (short) 1);
-    tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), (short) 1);
+    tableBlockInfo = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1);
+    tableBlockInfos = new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1);
   }
 
   @Test public void equalTestWithSameObject() {
@@ -44,7 +45,7 @@ public class TableBlockInfoTest {
   }
 
   @Test public void equalTestWithSimilarObject() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, (short) 1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 6, ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (res);
   }
@@ -60,52 +61,52 @@ public class TableBlockInfoTest {
   }
 
   @Test public void equlsTestWithDiffSegmentId() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, (short) 1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "diffsegmentId", null, 6, ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equlsTestWithDiffBlockOffset() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, (short) 1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 6, "segmentId", null, 6, ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffBlockLength() {
-    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, (short) 1);
+    TableBlockInfo tableBlockInfoTest = new TableBlockInfo("filePath", 4, "segmentId", null, 4, ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfo.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffBlockletNumber() {
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("filepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void equalsTestWithDiffFilePath() {
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("difffilepath", 6, "segmentId", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     Boolean res = tableBlockInfos.equals(tableBlockInfoTest);
     assert (!res);
   }
 
   @Test public void compareToTestForSegmentId() {
     TableBlockInfo tableBlockInfo =
-        new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("difffilepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedResult = 2;
     assertEquals(res, expectedResult);
 
     TableBlockInfo tableBlockInfo1 =
-        new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("difffilepath", 6, "6", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedResult1 = -1;
     assertEquals(res1, expectedResult1);
 
     TableBlockInfo tableBlockInfo2 =
-        new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("difffilepath", 6, "4", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     int res2 = tableBlockInfos.compareTo(tableBlockInfo2);
     int expectedresult2 = 1;
     assertEquals(res2, expectedresult2);
@@ -130,18 +131,18 @@ public class TableBlockInfoTest {
 
     };
 
-    TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, (short) 1);
+    TableBlockInfo tableBlockInfo = new TableBlockInfo("difffilepaths", 6, "5", null, 3, ColumnarFormatVersion.V1);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedResult = -5;
     assertEquals(res, expectedResult);
 
-    TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, (short) 1);
+    TableBlockInfo tableBlockInfo1 = new TableBlockInfo("filepath", 6, "5", null, 3, ColumnarFormatVersion.V1);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedResult1 = 1;
     assertEquals(res1, expectedResult1);
 
     TableBlockInfo tableBlockInfoTest =
-        new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), (short) 1);
+        new TableBlockInfo("filePath", 6, "5", null, 7, new BlockletInfos(6, 2, 2), ColumnarFormatVersion.V1);
     int res2 = tableBlockInfos.compareTo(tableBlockInfoTest);
     int expectedResult2 = -1;
     assertEquals(res2, expectedResult2);
@@ -149,13 +150,13 @@ public class TableBlockInfoTest {
 
   @Test public void compareToTestWithStartBlockletNo() {
     TableBlockInfo tableBlockInfo =
-        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), (short) 1);
+        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 3, 2), ColumnarFormatVersion.V1);
     int res = tableBlockInfos.compareTo(tableBlockInfo);
     int expectedresult =-1;
     assertEquals(res, expectedresult);
 
     TableBlockInfo tableBlockInfo1 =
-        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), (short) 1);
+        new TableBlockInfo("filepath", 6, "5", null, 6, new BlockletInfos(6, 1, 2), ColumnarFormatVersion.V1);
     int res1 = tableBlockInfos.compareTo(tableBlockInfo1);
     int expectedresult1 = 1;
     assertEquals(res1, expectedresult1);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
index 83b62a5..e9d09b8 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfoTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.carbondata.core.carbon.datastore.block;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -34,10 +35,10 @@ public class TableTaskInfoTest {
     tableBlockInfoList = new ArrayList<>(5);
 
     String[] locations = { "loc1", "loc2", "loc3" };
-    tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, (short) 1));
+    tableBlockInfoList.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1));
 
     String[] locs = { "loc4", "loc5" };
-    tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, (short) 1));
+    tableBlockInfoList.add(1, new TableBlockInfo("filepath", 2, "segmentId", locs, 6, ColumnarFormatVersion.V1));
 
     tableTaskInfo = new TableTaskInfo("taskId", tableBlockInfoList);
   }
@@ -68,10 +69,10 @@ public class TableTaskInfoTest {
     List<TableBlockInfo> tableBlockInfoListTest = new ArrayList<>();
 
     String[] locations = { "loc1", "loc2", "loc3" };
-    tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, (short) 1));
+    tableBlockInfoListTest.add(0, new TableBlockInfo("filePath", 2, "segmentID", locations, 6, ColumnarFormatVersion.V1));
 
     String[] locations1 = { "loc1", "loc2", "loc3" };
-    tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, (short) 1));
+    tableBlockInfoListTest.add(1, new TableBlockInfo("filePath", 2, "segmentID", locations1, 6, ColumnarFormatVersion.V1));
 
     List<String> res = TableTaskInfo.maxNoNodes(tableBlockInfoListTest);
     assert (res.equals(locs));

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index d959a5c..be270e4 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -166,7 +166,7 @@ public class CarbonMetadataUtilTest {
     segmentInfo.setNum_cols(0);
     segmentInfo.setColumn_cardinalities(CarbonUtil.convertToIntegerList(columnCardinality));
     IndexHeader indexHeader = new IndexHeader();
-    indexHeader.setVersion(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION);
+    indexHeader.setVersion(2);
     indexHeader.setSegment_info(segmentInfo);
     indexHeader.setTable_columns(columnSchemaList);
     IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index c0d890c..869c8cc 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -21,6 +21,7 @@ package org.apache.carbondata.core.util;
 import mockit.Mock;
 import mockit.MockUp;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.datastore.chunk.DimensionChunkAttributes;
 import org.apache.carbondata.core.carbon.datastore.chunk.impl.FixedLengthDimensionDataChunk;
@@ -555,18 +556,18 @@ public class CarbonUtilTest {
       @SuppressWarnings("unused") @Mock
       public DataFileFooter readDataFileFooter(TableBlockInfo info) {
         DataFileFooter fileFooter = new DataFileFooter();
-        fileFooter.setVersionId((short)1);
+        fileFooter.setVersionId(ColumnarFormatVersion.V1);
         return fileFooter;
       }
     };
-    TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 1, (short)1);
+    TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
     
-    assertEquals(CarbonUtil.readMetadatFile(info).getVersionId(), 1);
+    assertEquals(CarbonUtil.readMetadatFile(info).getVersionId().number(), 1);
   }
 
   @Test(expected = CarbonUtilException.class) public void testToReadMetadatFileWithException()
       throws Exception {
-	TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 1, (short)1);
+	TableBlockInfo info = new TableBlockInfo("file:/", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
     CarbonUtil.readMetadatFile(info);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
index 62d1ac7..b7c48d7 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataFileFooterConverterTest.java
@@ -22,6 +22,7 @@ package org.apache.carbondata.core.util;
 import mockit.Mock;
 import mockit.MockUp;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.BlockInfo;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.metadata.blocklet.BlockletInfo;
@@ -152,7 +153,7 @@ public class DataFileFooterConverterTest {
       }
     };
     String[] arr = { "a", "b", "c" };
-    TableBlockInfo tableBlockInfo = new TableBlockInfo("file", 3, "id", arr, 3, (short) 1);
+    TableBlockInfo tableBlockInfo = new TableBlockInfo("file", 3, "id", arr, 3, ColumnarFormatVersion.V1);
     tableBlockInfo.getBlockletInfos().setNoOfBlockLets(3);
     List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
     tableBlockInfoList.add(tableBlockInfo);
@@ -254,7 +255,7 @@ public class DataFileFooterConverterTest {
     segmentInfo.setNumberOfColumns(segmentInfo1.getNum_cols());
     dataFileFooter.setNumberOfRows(3);
     dataFileFooter.setSegmentInfo(segmentInfo);
-    TableBlockInfo info = new TableBlockInfo("file", 1, "0", new String[0], 1, (short)1);
+    TableBlockInfo info = new TableBlockInfo("file", 1, "0", new String[0], 1, ColumnarFormatVersion.V1);
     DataFileFooter result = dataFileFooterConverter.readDataFileFooter(info);
     assertEquals(result.getNumberOfRows(), 3);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
index 9102c78..75fdd1c 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -26,7 +26,7 @@ object CarbonExample {
 
   def main(args: Array[String]): Unit = {
     // to run the example, plz change this path to your local machine path
-    val rootPath = "/Users/wangfei/code/incubator-carbondata"
+    val rootPath = "/Users/jackylk/code/incubator-carbondata"
     val spark = SparkSession
         .builder()
         .master("local")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 8b453c7..e707c4e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.DataRefNode;
 import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
 import org.apache.carbondata.core.carbon.datastore.IndexKey;
@@ -254,8 +255,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       if (segmentId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
         continue;
       }
-      carbonSplits.add(CarbonInputSplit
-          .from(segmentId, fileSplit, CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION));
+      carbonSplits.add(CarbonInputSplit.from(segmentId, fileSplit,
+              ColumnarFormatVersion.valueOf(
+                  CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION)));
     }
     return carbonSplits;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index efc4f77..8b87cad 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -25,11 +25,12 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos;
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
 import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.carbon.path.CarbonTablePath;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.internal.index.Block;
 
 import org.apache.hadoop.fs.Path;
@@ -55,17 +56,18 @@ public class CarbonInputSplit extends FileSplit
    */
   private int numberOfBlocklets;
 
-  private short version = CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION;
+  private ColumnarFormatVersion version;
 
   public CarbonInputSplit() {
     segmentId = null;
     taskId = "0";
     numberOfBlocklets = 0;
     invalidSegments = new ArrayList<>();
+    version = CarbonProperties.getInstance().getFormatVersion();
   }
 
   private CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
-      short version) {
+      ColumnarFormatVersion version) {
     super(path, start, length, locations);
     this.segmentId = segmentId;
     this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
@@ -74,12 +76,13 @@ public class CarbonInputSplit extends FileSplit
   }
 
   public CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations,
-      int numberOfBlocklets, short version) {
+      int numberOfBlocklets, ColumnarFormatVersion version) {
     this(segmentId, path, start, length, locations, version);
     this.numberOfBlocklets = numberOfBlocklets;
   }
 
-  public static CarbonInputSplit from(String segmentId, FileSplit split, short version)
+  public static CarbonInputSplit from(String segmentId, FileSplit split,
+      ColumnarFormatVersion version)
       throws IOException {
     return new CarbonInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
         split.getLocations(), version);
@@ -120,7 +123,7 @@ public class CarbonInputSplit extends FileSplit
   @Override public void readFields(DataInput in) throws IOException {
     super.readFields(in);
     this.segmentId = in.readUTF();
-    this.version = in.readShort();
+    this.version = ColumnarFormatVersion.valueOf(in.readShort());
     int numInvalidSegment = in.readInt();
     invalidSegments = new ArrayList<>(numInvalidSegment);
     for (int i = 0; i < numInvalidSegment; i++) {
@@ -131,7 +134,7 @@ public class CarbonInputSplit extends FileSplit
   @Override public void write(DataOutput out) throws IOException {
     super.write(out);
     out.writeUTF(segmentId);
-    out.writeShort(version);
+    out.writeShort(version.number());
     out.writeInt(invalidSegments.size());
     for (String invalidSegment : invalidSegments) {
       out.writeUTF(invalidSegment);
@@ -155,11 +158,11 @@ public class CarbonInputSplit extends FileSplit
     return numberOfBlocklets;
   }
 
-  public short getVersion() {
+  public ColumnarFormatVersion getVersion() {
     return version;
   }
 
-  public void setVersion(short version) {
+  public void setVersion(ColumnarFormatVersion version) {
     this.version = version;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1801408..e5eb78a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier, ColumnarFormatVersion}
 import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -801,14 +801,13 @@ object CarbonDataRDDFactory {
           val jobContext = new Job(hadoopConfiguration)
           val rawSplits = inputFormat.getSplits(jobContext).toArray
           val result = new Array[Partition](rawSplits.size)
-          val blockList = rawSplits.map(inputSplit => {
+          val blockList = rawSplits.map { inputSplit =>
             val fileSplit = inputSplit.asInstanceOf[FileSplit]
             new TableBlockInfo(fileSplit.getPath.toString,
               fileSplit.getStart, "1",
-              fileSplit.getLocations, fileSplit.getLength, 0
+              fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1
             ).asInstanceOf[Distributable]
           }
-          )
           // group blocks to nodes, tasks
           val startTime = System.currentTimeMillis
           val activeNodes = DistributionUtil

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
index 431180c..749a6e8 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestQueryWithOldCarbonDataFile.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
  */
 class TestQueryWithOldCarbonDataFile extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
-	  CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");         
+	  CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V1");
     sql("drop table if exists OldFormatTable")
     sql("drop table if exists OldFormatTableHIVE")
      sql("""
@@ -47,14 +47,14 @@ class TestQueryWithOldCarbonDataFile extends QueryTest with BeforeAndAfterAll {
            name String, phonetype String, serialname String, salary Int)
           row format delimited fields terminated by ','
            """)      
-    sql("LOAD DATA local inpath './src/test/resources/OLDFORMATTABLE.csv' INTO table OldFormatTable");       
+    sql("LOAD DATA local inpath './src/test/resources/OLDFORMATTABLE.csv' INTO table OldFormatTable")
    sql(s"""
            LOAD DATA LOCAL INPATH './src/test/resources/OLDFORMATTABLEHIVE.csv' into table OldFormatTableHIVE
            """)
 
   }
 
-  CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "2");
+  CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V2")
   test("Test select * query") {
     checkAnswer(
       sql("select * from OldFormatTable"), sql("select * from OldFormatTableHIVE")
@@ -62,7 +62,7 @@ class TestQueryWithOldCarbonDataFile extends QueryTest with BeforeAndAfterAll {
   }
 
   override def afterAll {
-     CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "1");
+     CarbonProperties.getInstance.addProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION, "V1")
     sql("drop table if exists OldFormatTable")
     sql("drop table if exists OldFormatTableHIVE")
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index cfae186..21c0fa7 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
+import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier, ColumnarFormatVersion}
 import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
@@ -810,14 +810,13 @@ object CarbonDataRDDFactory {
           }
           val jobContext = new Job(hadoopConfiguration)
           val rawSplits = inputFormat.getSplits(jobContext).toArray
-          val blockList = rawSplits.map(inputSplit => {
+          val blockList = rawSplits.map { inputSplit =>
             val fileSplit = inputSplit.asInstanceOf[FileSplit]
             new TableBlockInfo(fileSplit.getPath.toString,
               fileSplit.getStart, "1",
-              fileSplit.getLocations, fileSplit.getLength
+              fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1
             ).asInstanceOf[Distributable]
           }
-          )
           // group blocks to nodes, tasks
           val startTime = System.currentTimeMillis
           val activeNodes = DistributionUtil

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
index 2fbb00e..047ac0d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonDataWriterFactory.java
@@ -16,12 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.carbondata.processing.store;
 
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.CarbonFactDataWriterImpl2;
-import org.apache.carbondata.processing.store.writer.CarbonFactDataWriterImplForIntIndexAndAggBlock;
+import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1;
+import org.apache.carbondata.processing.store.writer.v2.CarbonFactDataWriterImplV2;
 
 /**
  * Factory class to get the writer instance
@@ -57,13 +59,15 @@ public class CarbonDataWriterFactory {
    * @param carbonDataWriterVo writer vo object
    * @return writer instance
    */
-  public CarbonFactDataWriter<?> getFactDataWriter(final short version,
+  public CarbonFactDataWriter<?> getFactDataWriter(final ColumnarFormatVersion version,
       final CarbonDataWriterVo carbonDataWriterVo) {
     switch (version) {
-      case 2:
-        return new CarbonFactDataWriterImpl2(carbonDataWriterVo);
+      case V2:
+        return new CarbonFactDataWriterImplV2(carbonDataWriterVo);
+      case V1:
+        return new CarbonFactDataWriterImplV1(carbonDataWriterVo);
       default:
-        return new CarbonFactDataWriterImplForIntIndexAndAggBlock(carbonDataWriterVo);
+        throw new IllegalArgumentException("invalid format version: " + version);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index e560784..c961700 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.ColumnarFormatVersion;
 import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
@@ -1400,8 +1401,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * @return data writer instance
    */
   private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) {
-    short version = Short.parseShort(
-        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
+    ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
     return CarbonDataWriterFactory.getInstance()
         .getFactDataWriter(version, getDataWriterVo(keyBlockSize));
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0ef3fb81/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
deleted file mode 100644
index d399280..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriterImpl2.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.processing.store.writer;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
-import org.apache.carbondata.core.util.CarbonMetadataUtil;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.writer.CarbonFooterWriter;
-import org.apache.carbondata.format.DataChunk2;
-import org.apache.carbondata.format.FileFooter;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-
-/**
- * Below method will be used to write the data in version 2 format
- */
-public class CarbonFactDataWriterImpl2 extends CarbonFactDataWriterImplForIntIndexAndAggBlock {
-
-  /**
-   * logger
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonFactDataWriterImpl2.class.getName());
-
-  /**
-   * Constructor create instance of this class
-   *
-   * @param dataWriterVo
-   */
-  public CarbonFactDataWriterImpl2(CarbonDataWriterVo dataWriterVo) {
-    super(dataWriterVo);
-  }
-
-  /**
-   * Below method will be used to write the data to carbon data file
-   *
-   * @param holder
-   * @throws CarbonDataWriterException any problem in writing operation
-   */
-  @Override public void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException {
-    // size to calculate the size of the blocklet
-    int size = 0;
-    // get the blocklet info object
-    BlockletInfoColumnar blockletInfo = getBlockletInfo(holder, 0);
-
-    List<DataChunk2> datachunks = null;
-    try {
-      // get all the data chunks
-      datachunks = CarbonMetadataUtil
-          .getDatachunk2(blockletInfo, thriftColumnSchemaList, dataWriterVo.getSegmentProperties());
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while getting the data chunks", e);
-    }
-    // data chunk byte array
-    byte[][] dataChunkByteArray = new byte[datachunks.size()][];
-    for (int i = 0; i < dataChunkByteArray.length; i++) {
-      dataChunkByteArray[i] = CarbonUtil.getByteArray(datachunks.get(i));
-      // add the data chunk size
-      size += dataChunkByteArray[i].length;
-    }
-    // add row id index length
-    for (int i = 0; i < holder.getKeyBlockIndexLength().length; i++) {
-      size += holder.getKeyBlockIndexLength()[i];
-    }
-    // add rle index length
-    for (int i = 0; i < holder.getDataIndexMapLength().length; i++) {
-      size += holder.getDataIndexMapLength()[i];
-    }
-    // add dimension column data page and measure column data page size
-    long blockletDataSize =
-        holder.getTotalDimensionArrayLength() + holder.getTotalMeasureArrayLength() + size;
-    // if size of the file already reached threshold size then create a new file and get the file
-    // channel object
-    updateBlockletFileChannel(blockletDataSize);
-    // writer the version header in the file if current file size is zero
-    // this is done so carbondata file can be read separately
-    try {
-      if (fileChannel.size() == 0) {
-        short version = Short.parseShort(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.CARBON_DATA_FILE_VERSION));
-        byte[] header = (CarbonCommonConstants.CARBON_DATA_VERSION_HEADER + version).getBytes();
-        ByteBuffer buffer = ByteBuffer.allocate(header.length);
-        buffer.put(header);
-        buffer.rewind();
-        fileChannel.write(buffer);
-      }
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while getting the file channel size", e);
-    }
-    // write data to file and get its offset
-    writeDataToFile(holder, dataChunkByteArray, fileChannel);
-    // add blocklet info to list
-    blockletInfoList.add(blockletInfo);
-    LOGGER.info("A new blocklet is added, its data size is: " + blockletDataSize + " Byte");
-  }
-
-  /**
-   * Below method will be used to write the data to file
-   * Data Format
-   * <DColumn1DataChunk><DColumnDataPage><DColumnRle>
-   * <DColumn2DataChunk><DColumn2DataPage><DColumn2RowIds><DColumn2Rle>
-   * <DColumn3DataChunk><DColumn3DataPage><column3RowIds>
-   * <MColumn1DataChunk><MColumn1DataPage>
-   * <MColumn2DataChunk><MColumn2DataPage>
-   * <MColumn2DataChunk><MColumn2DataPage>
-   *
-   * @param nodeHolder
-   * @param dataChunksBytes
-   * @param channel
-   * @throws CarbonDataWriterException
-   */
-  private void writeDataToFile(NodeHolder nodeHolder, byte[][] dataChunksBytes, FileChannel channel)
-      throws CarbonDataWriterException {
-    long offset = 0;
-    try {
-      offset = channel.size();
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while getting the file channel size");
-    }
-    List<Long> currentDataChunksOffset = new ArrayList<>();
-    List<Short> currentDataChunksLength = new ArrayList<>();
-    dataChunksLength.add(currentDataChunksLength);
-    dataChunksOffsets.add(currentDataChunksOffset);
-    int bufferSize = 0;
-    int rowIdIndex = 0;
-    int rleIndex = 0;
-    for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
-      currentDataChunksOffset.add(offset);
-      currentDataChunksLength.add((short) dataChunksBytes[i].length);
-      bufferSize += dataChunksBytes[i].length + nodeHolder.getKeyLengths()[i] + (!nodeHolder
-          .getIsSortedKeyBlock()[i] ? nodeHolder.getKeyBlockIndexLength()[rowIdIndex] : 0) + (
-          dataWriterVo.getAggBlocks()[i] ?
-              nodeHolder.getCompressedDataIndex()[rleIndex].length :
-              0);
-      offset += dataChunksBytes[i].length;
-      offset += nodeHolder.getKeyLengths()[i];
-      if (!nodeHolder.getIsSortedKeyBlock()[i]) {
-        offset += nodeHolder.getKeyBlockIndexLength()[rowIdIndex];
-        rowIdIndex++;
-      }
-      if (dataWriterVo.getAggBlocks()[i]) {
-        offset += nodeHolder.getDataIndexMapLength()[rleIndex];
-        rleIndex++;
-      }
-    }
-    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
-    rleIndex = 0;
-    rowIdIndex = 0;
-    for (int i = 0; i < nodeHolder.getIsSortedKeyBlock().length; i++) {
-      buffer.put(dataChunksBytes[i]);
-      buffer.put(nodeHolder.getKeyArray()[i]);
-      if (!nodeHolder.getIsSortedKeyBlock()[i]) {
-        buffer.putInt(nodeHolder.getCompressedIndex()[rowIdIndex].length);
-        buffer.put(nodeHolder.getCompressedIndex()[rowIdIndex]);
-        if (nodeHolder.getCompressedIndexMap()[rowIdIndex].length > 0) {
-          buffer.put(nodeHolder.getCompressedIndexMap()[rowIdIndex]);
-        }
-        rowIdIndex++;
-      }
-      if (dataWriterVo.getAggBlocks()[i]) {
-        buffer.put(nodeHolder.getCompressedDataIndex()[rleIndex]);
-        rleIndex++;
-      }
-    }
-    try {
-      buffer.flip();
-      channel.write(buffer);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException(
-          "Problem while writing the dimension data in carbon data file", e);
-    }
-
-    int dataChunkIndex = nodeHolder.getKeyArray().length;
-    int totalLength = 0;
-    for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
-      currentDataChunksOffset.add(offset);
-      currentDataChunksLength.add((short) dataChunksBytes[dataChunkIndex].length);
-      offset += dataChunksBytes[dataChunkIndex].length;
-      offset += nodeHolder.getDataArray()[i].length;
-      totalLength += dataChunksBytes[dataChunkIndex].length;
-      totalLength += nodeHolder.getDataArray()[i].length;
-      dataChunkIndex++;
-    }
-    buffer = ByteBuffer.allocate(totalLength);
-    dataChunkIndex = nodeHolder.getKeyArray().length;
-    for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
-      buffer.put(dataChunksBytes[dataChunkIndex++]);
-      buffer.put(nodeHolder.getDataArray()[i]);
-    }
-    try {
-      buffer.flip();
-      channel.write(buffer);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException(
-          "Problem while writing the measure data in carbon data file", e);
-    }
-  }
-
-  /**
-   * This method will be used to get the blocklet metadata
-   *
-   * @return BlockletInfo - blocklet metadata
-   */
-  protected BlockletInfoColumnar getBlockletInfo(NodeHolder nodeHolder, long offset) {
-    // create the info object for leaf entry
-    BlockletInfoColumnar info = new BlockletInfoColumnar();
-    //add aggBlocks array
-    info.setAggKeyBlock(nodeHolder.getAggBlocks());
-    // add total entry count
-    info.setNumberOfKeys(nodeHolder.getEntryCount());
-
-    // add the key array length
-    info.setKeyLengths(nodeHolder.getKeyLengths());
-    // adding null measure index bit set
-    info.setMeasureNullValueIndex(nodeHolder.getMeasureNullValueIndex());
-    //add column min max length
-    info.setColumnMaxData(nodeHolder.getColumnMaxData());
-    info.setColumnMinData(nodeHolder.getColumnMinData());
-
-    // add measure length
-    info.setMeasureLength(nodeHolder.getMeasureLenght());
-
-    info.setIsSortedKeyColumn(nodeHolder.getIsSortedKeyBlock());
-    info.setKeyBlockIndexLength(nodeHolder.getKeyBlockIndexLength());
-    info.setDataIndexMapLength(nodeHolder.getDataIndexMapLength());
-    // set startkey
-    info.setStartKey(nodeHolder.getStartKey());
-    // set end key
-    info.setEndKey(nodeHolder.getEndKey());
-    info.setCompressionModel(nodeHolder.getCompressionModel());
-    // return leaf metadata
-
-    //colGroup Blocks
-    info.setColGrpBlocks(nodeHolder.getColGrpBlocks());
-
-    return info;
-  }
-
-  /**
-   * This method will write metadata at the end of file file format in thrift format
-   */
-  protected void writeBlockletInfoToFile(List<BlockletInfoColumnar> infoList, FileChannel channel,
-      String filePath) throws CarbonDataWriterException {
-    try {
-      // get the current file position
-      long currentPosition = channel.size();
-      CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
-      // get thrift file footer instance
-      FileFooter convertFileMeta = CarbonMetadataUtil
-          .convertFilterFooter2(infoList, localCardinality, thriftColumnSchemaList,
-              dataChunksOffsets, dataChunksLength);
-      // fill the carbon index details
-      fillBlockIndexInfoDetails(infoList, convertFileMeta.getNum_rows(), filePath, currentPosition);
-      // write the footer
-      writer.writeFooter(convertFileMeta, currentPosition);
-    } catch (IOException e) {
-      throw new CarbonDataWriterException("Problem while writing the carbon file: ", e);
-    }
-  }
-}