You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/05/10 01:17:03 UTC

[1/5] carbondata git commit: refactor write step based on ColumnPage

Repository: carbondata
Updated Branches:
  refs/heads/12-dev b72a90e06 -> a161db4e2


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 8bf4759..1500eb0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -25,6 +25,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -48,9 +49,12 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.datastore.impl.data.compressed.HeavyCompressedDoubleArrayDataStore;
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
+import org.apache.carbondata.core.datastore.page.FixLengthColumnPage;
+import org.apache.carbondata.core.datastore.page.VarLengthColumnPage;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -58,6 +62,7 @@ import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengt
 import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -67,10 +72,6 @@ import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
-import org.apache.carbondata.processing.store.colgroup.ColGroupDataHolder;
-import org.apache.carbondata.processing.store.colgroup.ColGroupMinMax;
-import org.apache.carbondata.processing.store.colgroup.ColumnDataHolder;
-import org.apache.carbondata.processing.store.colgroup.DataHolder;
 import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
@@ -116,7 +117,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private int mdKeyIndex;
   /**
-   * blocklet size
+   * blocklet size (for V1 and V2) or page size (for V3). A Producer thread will start to process
+   * once this size of input is reached
    */
   private int blockletSize;
   /**
@@ -140,14 +142,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private int tableBlockSize;
   /**
-   * otherMeasureIndex
-   */
-  private int[] otherMeasureIndex;
-  /**
-   * customMeasureIndex
-   */
-  private int[] customMeasureIndex;
-  /**
    * dimLens
    */
   private int[] dimLens;
@@ -161,18 +155,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private CarbonKeyBlockHolder[] keyBlockHolder;
   private boolean[] aggKeyBlock;
   private boolean[] isNoDictionary;
-  private boolean isAggKeyBlock;
   private long processedDataCount;
-  /**
-   * thread pool size to be used for block sort
-   */
-  private int thread_pool_size;
   private KeyGenerator[] complexKeyGenerator;
-  /**
-   * isDataWritingRequest
-   */
-  //    private boolean isDataWritingRequest;
-
   private ExecutorService producerExecutorService;
   private List<Future<Void>> producerExecutorServiceTaskList;
   private ExecutorService consumerExecutorService;
@@ -181,7 +165,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private int noDictionaryCount;
   private ColumnGroupModel colGrpModel;
   private int[] primitiveDimLens;
-  private char[] type;
+  private DataType[] type;
   private int[] completeDimLens;
   private boolean[] isUseInvertedIndex;
   /**
@@ -201,10 +185,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private BlockletDataHolder blockletDataHolder;
   /**
-   * a private class which will take each blocklet in order and write to a file
-   */
-  private Consumer consumer;
-  /**
    * number of cores configured
    */
   private int numberOfCores;
@@ -227,20 +207,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private int complexColCount;
 
   /**
-   * no of column blocks
-   */
-  private int columnStoreCount;
-
-  /**
    * column schema present in the table
    */
   private List<ColumnSchema> wrapperColumnSchemaList;
 
   /**
    * boolean to check whether dimension
-   * is of dictionary type or no dictionary time
+   * is of dictionary type or no dictionary type
    */
-  private boolean[] dimensionType;
+  private boolean[] isDictDimension;
 
   /**
    * colCardinality for the merge case.
@@ -260,8 +235,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
   private long schemaUpdatedTimeStamp;
 
-  private String segmentId;
-
   private int taskExtension;
 
   /**
@@ -277,19 +250,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.dimensionCount = carbonFactDataHandlerModel.getDimensionCount();
     this.complexIndexMap = carbonFactDataHandlerModel.getComplexIndexMap();
     this.primitiveDimLens = carbonFactDataHandlerModel.getPrimitiveDimLens();
-    this.isAggKeyBlock = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
-            CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
     this.carbonDataDirectoryPath = carbonFactDataHandlerModel.getCarbonDataDirectoryPath();
-    this.complexColCount = getComplexColsCount();
-    this.columnStoreCount =
-        this.colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount;
+    this.complexColCount = getExpandedComplexColsCount();
 
-    this.aggKeyBlock = new boolean[columnStoreCount];
-    this.isNoDictionary = new boolean[columnStoreCount];
+    int numDimColumns = colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount;
+    this.aggKeyBlock = new boolean[numDimColumns];
+    this.isNoDictionary = new boolean[numDimColumns];
     this.bucketNumber = carbonFactDataHandlerModel.getBucketId();
     this.taskExtension = carbonFactDataHandlerModel.getTaskExtension();
-    this.isUseInvertedIndex = new boolean[columnStoreCount];
+    this.isUseInvertedIndex = new boolean[numDimColumns];
     if (null != carbonFactDataHandlerModel.getIsUseInvertedIndex()) {
       for (int i = 0; i < isUseInvertedIndex.length; i++) {
         if (i < carbonFactDataHandlerModel.getIsUseInvertedIndex().length) {
@@ -305,6 +274,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       this.isNoDictionary[noDictStartIndex + i] = true;
     }
 
+    boolean isAggKeyBlock = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
+            CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
     if (isAggKeyBlock) {
       int noDictionaryValue = Integer.parseInt(CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_VALUE,
@@ -346,7 +318,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.databaseName = carbonFactDataHandlerModel.getDatabaseName();
     this.tableBlockSize = carbonFactDataHandlerModel.getBlockSizeInMB();
     this.tableName = carbonFactDataHandlerModel.getTableName();
-    this.type = carbonFactDataHandlerModel.getAggType();
+    this.type = carbonFactDataHandlerModel.getMeasureDataType();
     this.segmentProperties = carbonFactDataHandlerModel.getSegmentProperties();
     this.wrapperColumnSchemaList = carbonFactDataHandlerModel.getWrapperColumnSchema();
     this.colCardinality = carbonFactDataHandlerModel.getColCardinality();
@@ -360,11 +332,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.dimLens = this.segmentProperties.getDimColumnsCardinality();
     this.carbonDataFileAttributes = carbonFactDataHandlerModel.getCarbonDataFileAttributes();
     this.schemaUpdatedTimeStamp = carbonFactDataHandlerModel.getSchemaUpdatedTimeStamp();
-    this.segmentId = carbonFactDataHandlerModel.getSegmentId();
+
     //TODO need to pass carbon table identifier to metadata
     CarbonTable carbonTable = CarbonMetadata.getInstance()
         .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
-    dimensionType =
+    isDictDimension =
         CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(tableName));
 
     this.compactionFlow = carbonFactDataHandlerModel.isCompactionFlow();
@@ -403,15 +375,17 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     consumerExecutorServiceTaskList = new ArrayList<>(1);
     semaphore = new Semaphore(numberOfCores);
     blockletDataHolder = new BlockletDataHolder();
-    consumer = new Consumer(blockletDataHolder);
+
+    // Start the consumer which will take each blocklet/page in order and write to a file
+    Consumer consumer = new Consumer(blockletDataHolder);
     consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer));
   }
 
   private boolean[] arrangeUniqueBlockType(boolean[] aggKeyBlock) {
     int counter = 0;
     boolean[] uniqueBlock = new boolean[aggKeyBlock.length];
-    for (int i = 0; i < dimensionType.length; i++) {
-      if (dimensionType[i]) {
+    for (int i = 0; i < isDictDimension.length; i++) {
+      if (isDictDimension[i]) {
         uniqueBlock[i] = aggKeyBlock[counter++];
       } else {
         uniqueBlock[i] = false;
@@ -463,8 +437,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     if (this.entryCount == this.blockletSize) {
       try {
         semaphore.acquire();
-        producerExecutorServiceTaskList.add(producerExecutorService.submit(
-            new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)));
+
+        producerExecutorServiceTaskList.add(
+            producerExecutorService.submit(
+                new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)
+            )
+        );
         blockletProcessingCount.incrementAndGet();
         // set the entry count to zero
         processedDataCount += entryCount;
@@ -478,414 +456,265 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
   }
 
-  /** statics for one blocklet/page */
-  class Statistics {
-    /** min and max value of the measures */
-    Object[] min, max;
-
-    /**
-     * the unique value is the non-exist value in the row,
-     * and will be used as storage key for null values of measures
-     */
-    Object[] uniqueValue;
-
-    /** decimal count of the measures */
-    int[] decimal;
-
-    Statistics(int measureCount) {
-      max = new Object[measureCount];
-      min = new Object[measureCount];
-      uniqueValue = new Object[measureCount];
-      decimal = new int[measureCount];
-      for (int i = 0; i < measureCount; i++) {
-        if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          max[i] = Long.MIN_VALUE;
-          min[i] = Long.MAX_VALUE;
-          uniqueValue[i] = Long.MIN_VALUE;
-        } else if (type[i] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          max[i] = Double.MIN_VALUE;
-          min[i] = Double.MAX_VALUE;
-          uniqueValue[i] = Double.MIN_VALUE;
-        } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          max[i] = new BigDecimal(Double.MIN_VALUE);
-          min[i] = new BigDecimal(Double.MAX_VALUE);
-          uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
-        } else {
-          max[i] = 0.0;
-          min[i] = 0.0;
-          uniqueValue[i] = 0.0;
-        }
-        decimal[i] = 0;
-      }
-    }
-
-    /**
-     * update the statistics for the input row
-     */
-    void update(int[] msrIndex, Object[] row, boolean compactionFlow) {
-      // Update row level min max
-      for (int i = 0; i < msrIndex.length; i++) {
-        int count = msrIndex[i];
-        if (row[count] != null) {
-          if (type[count] == CarbonCommonConstants.DOUBLE_MEASURE) {
-            double value = (double) row[count];
-            double maxVal = (double) max[count];
-            double minVal = (double) min[count];
-            max[count] = (maxVal > value ? max[count] : value);
-            min[count] = (minVal < value ? min[count] : value);
-            int num = getDecimalCount(value);
-            decimal[count] = (decimal[count] > num ? decimal[count] : num);
-            uniqueValue[count] = (double) min[count] - 1;
-          } else if (type[count] == CarbonCommonConstants.BIG_INT_MEASURE) {
-            long value = (long) row[count];
-            long maxVal = (long) max[count];
-            long minVal = (long) min[count];
-            max[count] = (maxVal > value ? max[count] : value);
-            min[count] = (minVal < value ? min[count] : value);
-            uniqueValue[count] = (long) min[count] - 1;
-          } else if (type[count] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-            byte[] buff = null;
-            // in compaction flow the measure with decimal type will come as spark decimal.
-            // need to convert it to byte array.
-            if (compactionFlow) {
-              BigDecimal bigDecimal = ((Decimal) row[count]).toJavaBigDecimal();
-              buff = DataTypeUtil.bigDecimalToByte(bigDecimal);
-            } else {
-              buff = (byte[]) row[count];
-            }
-            BigDecimal value = DataTypeUtil.byteToBigDecimal(buff);
-            decimal[count] = value.scale();
-            BigDecimal val = (BigDecimal) min[count];
-            uniqueValue[count] = (val.subtract(new BigDecimal(1.0)));
-          }
-        }
-      }
-    }
-  }
-
   class IndexKey {
+    private int pageSize;
     byte[] currentMDKey = null;
     byte[][] currentNoDictionaryKey = null;
     byte[] startKey = null;
     byte[] endKey = null;
     byte[][] noDictStartKey = null;
     byte[][] noDictEndKey = null;
+    byte[] packedNoDictStartKey = null;
+    byte[] packedNoDictEndKey = null;
+
+    IndexKey(int pageSize) {
+      this.pageSize = pageSize;
+    }
 
     /** update all keys based on the input row */
-    void update(Object[] row, boolean firstRow) {
+    void update(int rowId, Object[] row) {
       currentMDKey = (byte[]) row[mdKeyIndex];
       if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
         currentNoDictionaryKey = (byte[][]) row[mdKeyIndex - 1];
       }
-      if (firstRow) {
+      if (rowId == 0) {
         startKey = currentMDKey;
         noDictStartKey = currentNoDictionaryKey;
       }
       endKey = currentMDKey;
       noDictEndKey = currentNoDictionaryKey;
-    }
-  }
-
-  /** generate the NodeHolder from the input rows */
-  private NodeHolder processDataRows(List<Object[]> dataRows)
-      throws CarbonDataWriterException {
-    // to store index of the measure columns which are null
-    BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
-    // statistics for one blocklet/page
-    Statistics stats = new Statistics(measureCount);
-    IndexKey keys = new IndexKey();
-
-    // initialize measureHolder, mdKeyHolder and noDictionaryHolder, these three Holders
-    // are the input for final encoding
-    CarbonWriteDataHolder[] measureHolder = initialiseDataHolder(dataRows.size());
-    CarbonWriteDataHolder mdKeyHolder = initialiseKeyBlockHolder(dataRows.size());
-    CarbonWriteDataHolder noDictionaryHolder = null;
-    if ((noDictionaryCount + complexColCount) > 0) {
-      noDictionaryHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size());
-    }
-
-    // loop on the input rows, fill measureHolder, mdKeyHolder and noDictionaryHolder
-    for (int count = 0; count < dataRows.size(); count++) {
-      Object[] row = dataRows.get(count);
-      keys.update(row, (count == 0));
-      if (keys.currentMDKey.length > 0) {
-        mdKeyHolder.setWritableByteArrayValueByIndex(count, keys.currentMDKey);
+      if (rowId == pageSize - 1) {
+        finalizeKeys();
       }
-      if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-        noDictionaryHolder.setWritableNonDictByteArrayValueByIndex(count,
-            keys.currentNoDictionaryKey);
-      }
-      fillMeasureHolder(row, count, measureHolder, nullValueIndexBitSet);
-      stats.update(otherMeasureIndex, row, compactionFlow);
-      stats.update(customMeasureIndex, row, compactionFlow);
-    }
-
-    // generate encoded byte array for 3 holders
-    // for measure columns: encode and compress the measureHolder
-    WriterCompressModel compressionModel =
-        ValueCompressionUtil.getWriterCompressModel(
-            stats.max, stats.min, stats.decimal, stats.uniqueValue, type, new byte[measureCount]);
-    byte[][] encodedMeasureArray =
-        HeavyCompressedDoubleArrayDataStore.encodeMeasureDataArray(
-            compressionModel, measureHolder);
-
-    // for mdkey and noDictionary, it is already in bytes, just get the array from holder
-    byte[][] mdKeyArray = mdKeyHolder.getByteArrayValues();
-    byte[][][] noDictionaryArray = null;
-    if ((noDictionaryCount + complexColCount) > 0) {
-      noDictionaryArray = noDictionaryHolder.getNonDictByteArrayValues();
     }
 
-    // create NodeHolder using these encoded byte arrays
-    NodeHolder nodeHolder =
-        createNodeHolderObjectWithOutKettle(
-            encodedMeasureArray, mdKeyArray, noDictionaryArray, dataRows.size(),
-            keys.startKey, keys.endKey, compressionModel, keys.noDictStartKey, keys.noDictEndKey,
-            nullValueIndexBitSet);
-    LOGGER.info("Number Of records processed: " + dataRows.size());
-    return nodeHolder;
-  }
-
-  private void fillMeasureHolder(Object[] row, int count, CarbonWriteDataHolder[] measureHolder,
-      BitSet[] nullValueIndexBitSet) {
-    for (int k = 0; k < otherMeasureIndex.length; k++) {
-      if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        if (null == row[otherMeasureIndex[k]]) {
-          nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
-          measureHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L);
-        } else {
-          measureHolder[otherMeasureIndex[k]]
-              .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]);
+    /** update all keys if SORT_COLUMNS option is used when creating table */
+    private void finalizeKeys() {
+      // If SORT_COLUMNS is used, may need to update start/end keys since the they may
+      // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
+      // start/end key
+      int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns();
+      if (numberOfDictSortColumns > 0) {
+        // if SORT_COLUMNS contain dictionary columns
+        int[] keySize = columnarSplitter.getBlockKeySize();
+        if (keySize.length > numberOfDictSortColumns) {
+          // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here
+          int newMdkLength = 0;
+          for (int i = 0; i < numberOfDictSortColumns; i++) {
+            newMdkLength += keySize[i];
+          }
+          byte[] newStartKeyOfSortKey = new byte[newMdkLength];
+          byte[] newEndKeyOfSortKey = new byte[newMdkLength];
+          System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength);
+          System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength);
+          startKey = newStartKeyOfSortKey;
+          endKey = newEndKeyOfSortKey;
         }
       } else {
-        if (null == row[otherMeasureIndex[k]]) {
-          nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
-          measureHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0);
-        } else {
-          measureHolder[otherMeasureIndex[k]]
-              .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]);
-        }
+        startKey = new byte[0];
+        endKey = new byte[0];
       }
-    }
-    ByteBuffer byteBuffer = null;
-    byte[] measureBytes = null;
-    for (int i = 0; i < customMeasureIndex.length; i++) {
-      if (null == row[customMeasureIndex[i]]
-          && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        measureBytes = DataTypeUtil.zeroBigDecimalBytes;
-        nullValueIndexBitSet[customMeasureIndex[i]].set(count);
-      } else {
-        if (this.compactionFlow) {
-          BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
-          measureBytes = DataTypeUtil.bigDecimalToByte(bigDecimal);
-        } else {
-          measureBytes = (byte[]) row[customMeasureIndex[i]];
+
+      // Do the same update for noDictionary start/end Key
+      int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns();
+      if (numberOfNoDictSortColumns > 0) {
+        // if sort_columns contain no-dictionary columns
+        if (noDictStartKey.length > numberOfNoDictSortColumns) {
+          byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
+          byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
+          System.arraycopy(
+              noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns);
+          System.arraycopy(
+              noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
+          noDictStartKey = newNoDictionaryStartKey;
+          noDictEndKey = newNoDictionaryEndKey;
         }
+        packedNoDictStartKey =
+            NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey);
+        packedNoDictEndKey =
+            NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey);
       }
-      byteBuffer = ByteBuffer.allocate(measureBytes.length +
-          CarbonCommonConstants.INT_SIZE_IN_BYTE);
-      byteBuffer.putInt(measureBytes.length);
-      byteBuffer.put(measureBytes);
-      byteBuffer.flip();
-      measureBytes = byteBuffer.array();
-      measureHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, measureBytes);
     }
   }
 
-  private NodeHolder createNodeHolderObjectWithOutKettle(byte[][] measureArray, byte[][] mdKeyArray,
-      byte[][][] noDictionaryArray, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
-      WriterCompressModel compressionModel, byte[][] noDictionaryStartKey,
-      byte[][] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
-      throws CarbonDataWriterException {
-    byte[][][] noDictionaryColumnsData = null;
-    List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
-    int complexColCount = getComplexColsCount();
-
-    for (int i = 0; i < complexColCount; i++) {
-      colsAndValues.add(new ArrayList<byte[]>());
+  /**
+   * Represent a page data for all columns, we store its data in columnar layout, so that
+   * all processing apply to TablePage can be done in vectorized fashion.
+   */
+  class TablePage {
+
+    // For all dimension and measure columns, we store the column data directly in the page,
+    // the length of the page is the number of rows.
+
+    // TODO: we should have separate class for key columns so that keys are stored together in
+    // one vector to make it efficient for sorting
+    VarLengthColumnPage[] dictDimensionPage;
+    VarLengthColumnPage[] noDictDimensionPage;
+    ComplexColumnPage[] complexDimensionPage;
+    FixLengthColumnPage[] measurePage;
+
+    // the num of rows in this page, it must be less than short value (65536)
+    int pageSize;
+
+    TablePage(int pageSize) {
+      this.pageSize = pageSize;
+      dictDimensionPage = new VarLengthColumnPage[dimensionCount];
+      for (int i = 0; i < dictDimensionPage.length; i++) {
+        dictDimensionPage[i] = new VarLengthColumnPage(pageSize);
+      }
+      noDictDimensionPage = new VarLengthColumnPage[noDictionaryCount];
+      for (int i = 0; i < noDictDimensionPage.length; i++) {
+        noDictDimensionPage[i] = new VarLengthColumnPage(pageSize);
+      }
+      complexDimensionPage = new ComplexColumnPage[getComplexColumnCount()];
+      for (int i = 0; i < complexDimensionPage.length; i++) {
+        // here we still do not the depth of the complex column, it will be initialized when
+        // we get the first row.
+        complexDimensionPage[i] = null;
+      }
+      measurePage = new FixLengthColumnPage[measureCount];
+      for (int i = 0; i < measurePage.length; i++) {
+        measurePage[i] = new FixLengthColumnPage(type[i], pageSize);
+      }
     }
-    int noOfColumn = colGrpModel.getNoOfColumnStore();
-    DataHolder[] dataHolders = getDataHolders(noOfColumn, mdKeyArray.length);
-    for (int i = 0; i < mdKeyArray.length; i++) {
-      byte[][] splitKey = columnarSplitter.splitKey(mdKeyArray[i]);
 
-      for (int j = 0; j < splitKey.length; j++) {
-        dataHolders[j].addData(splitKey[j], i);
+    /**
+     * Add one row to the internal store, it will be converted into columnar layout
+     * @param rowId Id of the input row
+     * @param rows row object
+     */
+    void addRow(int rowId, Object[] rows) {
+
+      // convert dictionary columns
+      byte[] MDKey = (byte[]) rows[mdKeyIndex];
+      if (columnarSplitter != null) {
+        byte[][] splitKey = columnarSplitter.splitKey(MDKey);
+        for (int i = 0; i < splitKey.length; i++) {
+          dictDimensionPage[i].putByteArray(rowId, splitKey[i]);
+        }
       }
-    }
-    if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-      noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryArray.length][];
-      for (int i = 0; i < noDictionaryArray.length; i++) {
-        int complexColumnIndex = primitiveDimLens.length + noDictionaryCount;
-        byte[][] splitKey = noDictionaryArray[i];
-
-        int complexTypeIndex = 0;
-        for (int j = 0; j < splitKey.length; j++) {
-          //nodictionary Columns
-          if (j < noDictionaryCount) {
-            int keyLength = splitKey[j].length;
-            byte[] newKey = new byte[keyLength + 2];
-            ByteBuffer buffer = ByteBuffer.wrap(newKey);
-            buffer.putShort((short) keyLength);
-            System.arraycopy(splitKey[j], 0, newKey, 2, keyLength);
-            noDictionaryColumnsData[j][i] = newKey;
-          }
-          //complex types
-          else {
-            // Need to write columnar block from complex byte array
-            int index = complexColumnIndex - noDictionaryCount;
-            GenericDataType complexDataType = complexIndexMap.get(index);
-            complexColumnIndex++;
-            if (complexDataType != null) {
-              List<ArrayList<byte[]>> columnsArray = new ArrayList<ArrayList<byte[]>>();
-              for (int k = 0; k < complexDataType.getColsCount(); k++) {
-                columnsArray.add(new ArrayList<byte[]>());
-              }
-
-              try {
-                ByteBuffer byteArrayInput = ByteBuffer.wrap(splitKey[j]);
-                ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
-                DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
-                complexDataType
-                    .parseAndBitPack(byteArrayInput, dataOutputStream, this.complexKeyGenerator);
-                complexDataType.getColumnarDataForComplexType(columnsArray,
-                    ByteBuffer.wrap(byteArrayOutput.toByteArray()));
-                byteArrayOutput.close();
-              } catch (IOException e) {
-                throw new CarbonDataWriterException(
-                    "Problem while bit packing and writing complex datatype", e);
-              } catch (KeyGenException e) {
-                throw new CarbonDataWriterException(
-                    "Problem while bit packing and writing complex datatype", e);
-              }
-
-              for (ArrayList<byte[]> eachColumn : columnsArray) {
-                colsAndValues.get(complexTypeIndex++).addAll(eachColumn);
-              }
-            } else {
-              // This case not possible as ComplexType is the last columns
-            }
+
+      // convert noDictionary columns and complex columns.
+      if (noDictionaryCount > 0 || complexColCount > 0) {
+        byte[][] noDictAndComplex = (byte[][])(rows[mdKeyIndex - 1]);
+        for (int i = 0; i < noDictAndComplex.length; i++) {
+          if (i < noDictionaryCount) {
+            // noDictionary columns, since it is variable length, we need to prepare each
+            // element as LV encoded byte array (first two bytes are the length of the array)
+            byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
+            noDictDimensionPage[i].putByteArray(rowId, valueWithLength);
+          } else {
+            // complex columns
+            addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
           }
         }
       }
-    }
-    thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
-            CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL));
-    ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
-    List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
-        primitiveDimLens.length + noDictionaryCount + complexColCount);
-    int i = 0;
-    int dictionaryColumnCount = -1;
-    int noDictionaryColumnCount = -1;
-    boolean isSortColumn = false;
-    for (i = 0; i < dimensionType.length; i++) {
-      isSortColumn = i < segmentProperties.getNumberOfSortColumns();
-      if (dimensionType[i]) {
-        dictionaryColumnCount++;
-        if (colGrpModel.isColumnar(dictionaryColumnCount)) {
-          submit.add(executorService.submit(
-              new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), isSortColumn,
-                  isUseInvertedIndex[i] & isSortColumn)));
-        } else {
-          submit.add(
-              executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
+
+      // convert measure columns
+      for (int i = 0; i < type.length; i++) {
+        Object value = rows[i];
+
+        // in compaction flow the measure with decimal type will come as spark decimal.
+        // need to convert it to byte array.
+        if (type[i] == DataType.DECIMAL && compactionFlow) {
+          BigDecimal bigDecimal = ((Decimal) rows[i]).toJavaBigDecimal();
+          value = DataTypeUtil.bigDecimalToByte(bigDecimal);
         }
-      } else {
-        submit.add(executorService.submit(
-            new BlockSortThread(i, noDictionaryColumnsData[++noDictionaryColumnCount], false, true,
-                isSortColumn, isUseInvertedIndex[i] & isSortColumn)));
+        measurePage[i].putData(rowId, value);
       }
     }
-    for (int k = 0; k < complexColCount; k++) {
-      submit.add(executorService.submit(new BlockSortThread(i++,
-          colsAndValues.get(k).toArray(new byte[colsAndValues.get(k).size()][]), false, true)));
-    }
-    executorService.shutdown();
-    try {
-      executorService.awaitTermination(1, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    IndexStorage[] blockStorage =
-        new IndexStorage[colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount];
-    try {
-      for (int k = 0; k < blockStorage.length; k++) {
-        blockStorage[k] = submit.get(k).get();
+
+    /**
+     * add a complex column into internal member compleDimensionPage
+     * @param index index of the complexDimensionPage
+     * @param rowId Id of the input row
+     * @param complexColumns byte array the complex columm to be added, extracted of input row
+     */
+    // TODO: this function should be refactoried, ColumnPage should support complex type encoding
+    // directly instead of doing it here
+    private void addComplexColumn(int index, int rowId, byte[] complexColumns) {
+      GenericDataType complexDataType = complexIndexMap.get(index + primitiveDimLens.length);
+
+      // initialize the page if first row
+      if (rowId == 0) {
+        int depthInComplexColumn = complexDataType.getColsCount();
+        complexDimensionPage[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
       }
-    } catch (Exception e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    byte[] composedNonDictStartKey = null;
-    byte[] composedNonDictEndKey = null;
-
-    int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns();
-    // generate start/end key by sort_columns
-    if (numberOfDictSortColumns > 0) {
-      // if sort_columns contain dictionary columns
-      int[] keySize = columnarSplitter.getBlockKeySize();
-      if (keySize.length > numberOfDictSortColumns) {
-        int newMdkLength = 0;
-        for (int index = 0; index < numberOfDictSortColumns; index++) {
-          newMdkLength += keySize[index];
-        }
-        byte[] newStartKeyOfSortKey = new byte[newMdkLength];
-        byte[] newEndKeyOfSortKey = new byte[newMdkLength];
-        System.arraycopy(startkeyLocal, 0, newStartKeyOfSortKey, 0, newMdkLength);
-        System.arraycopy(endKeyLocal, 0, newEndKeyOfSortKey, 0, newMdkLength);
-        startkeyLocal = newStartKeyOfSortKey;
-        endKeyLocal = newEndKeyOfSortKey;
+
+      int depthInComplexColumn = complexDimensionPage[index].getDepth();
+      // this is the encoded columnar data which will be added to page,
+      // size of this list is the depth of complex column, we will fill it by input data
+      List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
+      for (int k = 0; k < depthInComplexColumn; k++) {
+        encodedComplexColumnar.add(new ArrayList<byte[]>());
+      }
+
+      // encode the complex type data and fill columnsArray
+      try {
+        ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
+        ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
+        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
+        complexDataType.parseAndBitPack(byteArrayInput, dataOutputStream, complexKeyGenerator);
+        complexDataType.getColumnarDataForComplexType(encodedComplexColumnar,
+            ByteBuffer.wrap(byteArrayOutput.toByteArray()));
+        byteArrayOutput.close();
+      } catch (IOException | KeyGenException e) {
+        throw new CarbonDataWriterException(
+            "Problem while bit packing and writing complex datatype", e);
       }
-    } else {
-      startkeyLocal = new byte[0];
-      endKeyLocal = new byte[0];
-    }
 
-    int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns();
-    if (numberOfNoDictSortColumns > 0) {
-      // if sort_columns contain no-dictionary columns
-      if (noDictionaryStartKey.length > numberOfNoDictSortColumns) {
-        byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
-        byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
-        System.arraycopy(noDictionaryStartKey, 0, newNoDictionaryStartKey, 0,
-            numberOfNoDictSortColumns);
-        System
-            .arraycopy(noDictionaryEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
-        noDictionaryStartKey = newNoDictionaryStartKey;
-        noDictionaryEndKey = newNoDictionaryEndKey;
+      for (int depth = 0; depth < depthInComplexColumn; depth++) {
+        complexDimensionPage[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
       }
-      composedNonDictStartKey =
-          NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryStartKey);
-      composedNonDictEndKey =
-          NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryEndKey);
     }
-    return this.dataWriter
-        .buildDataNodeHolder(blockStorage, measureArray, entryCountLocal, startkeyLocal,
-            endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey,
-            nullValueIndexBitSet);
+
+    // Adds length as a short element (first 2 bytes) to the head of the input byte array
+    private byte[] addLengthToByteArray(byte[] input) {
+      byte[] output = new byte[input.length + 2];
+      ByteBuffer buffer = ByteBuffer.wrap(output);
+      buffer.putShort((short) input.length);
+      buffer.put(input, 0, input.length);
+      return output;
+    }
+
   }
 
   /**
-   * DataHolder will have all row mdkey data
-   *
-   * @param noOfColumn : no of column participated in mdkey
-   * @param noOfRow    : total no of row
-   * @return : dataholder
+   * generate the NodeHolder from the input rows (one page in case of V3 format)
    */
-  private DataHolder[] getDataHolders(int noOfColumn, int noOfRow) {
-    DataHolder[] dataHolders = new DataHolder[noOfColumn];
-    int colGrpId = -1;
-    for (int colGrp = 0; colGrp < noOfColumn; colGrp++) {
-      if (colGrpModel.isColumnar(colGrp)) {
-        dataHolders[colGrp] = new ColumnDataHolder(noOfRow);
-      } else {
-        ColGroupMinMax colGrpMinMax = new ColGroupMinMax(segmentProperties, ++colGrpId);
-        dataHolders[colGrp] =
-            new ColGroupDataHolder(this.columnarSplitter.getBlockKeySize()[colGrp], noOfRow,
-                colGrpMinMax);
-      }
+  private NodeHolder processDataRows(List<Object[]> dataRows)
+      throws CarbonDataWriterException {
+    TablePage tablePage = new TablePage(dataRows.size());
+    IndexKey keys = new IndexKey(dataRows.size());
+    int rowId = 0;
+
+    // convert row to columnar data
+    for (Object[] row : dataRows) {
+      tablePage.addRow(rowId, row);
+      keys.update(rowId, row);
+      rowId++;
+    }
+
+    // encode and compress dimensions and measure
+    // TODO: To make the encoding more transparent to the user, user should be enable to specify
+    // the encoding and compression method for each type when creating table.
+
+    Codec codec = new Codec();
+    IndexStorage[] dimColumns = codec.encodeAndCompressDimensions(tablePage);
+    Codec encodedMeasure = codec.encodeAndCompressMeasures(tablePage);
+
+    // prepare nullBitSet for writer, remove this after writer can accept TablePage
+    BitSet[] nullBitSet = new BitSet[tablePage.measurePage.length];
+    for (int i = 0; i < nullBitSet.length; i++) {
+      nullBitSet[i] = tablePage.measurePage[i].getNullBitSet();
     }
-    return dataHolders;
+
+    LOGGER.info("Number Of records processed: " + dataRows.size());
+
+    // TODO: writer interface should be modified to use TablePage
+    return dataWriter.buildDataNodeHolder(dimColumns, encodedMeasure.getEncodedMeasure(),
+        dataRows.size(), keys.startKey, keys.endKey, encodedMeasure.getCompressionModel(),
+        keys.packedNoDictStartKey, keys.packedNoDictEndKey, nullBitSet);
   }
 
   /**
@@ -958,7 +787,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return count;
   }
 
-  private int getComplexColsCount() {
+  // return the number of complex column after complex columns are expanded
+  private int getExpandedComplexColsCount() {
     int count = 0;
     for (int i = 0; i < dimensionCount; i++) {
       GenericDataType complexDataType = complexIndexMap.get(i);
@@ -969,6 +799,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return count;
   }
 
+  // return the number of complex column
+  private int getComplexColumnCount() {
+    return complexIndexMap.size();
+  }
+
   /**
    * below method will be used to close the handler
    */
@@ -994,80 +829,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   }
 
   /**
-   * @param value
-   * @return it return no of value after decimal
-   */
-  private int getDecimalCount(double value) {
-    String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString();
-    int integerPlaces = strValue.indexOf('.');
-    int decimalPlaces = 0;
-    if (-1 != integerPlaces) {
-      decimalPlaces = strValue.length() - integerPlaces - 1;
-    }
-    return decimalPlaces;
-  }
-
-  /**
-   * This method will be used to update the max value for each measure
-   */
-  private void calculateMaxMin(Object[] max, Object[] min, int[] decimal, int[] msrIndex,
-      Object[] row) {
-    // Update row level min max
-    for (int i = 0; i < msrIndex.length; i++) {
-      int count = msrIndex[i];
-      if (row[count] != null) {
-        if (type[count] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          double value = (double) row[count];
-          double maxVal = (double) max[count];
-          double minVal = (double) min[count];
-          max[count] = (maxVal > value ? max[count] : value);
-          min[count] = (minVal < value ? min[count] : value);
-          int num = getDecimalCount(value);
-          decimal[count] = (decimal[count] > num ? decimal[count] : num);
-        } else if (type[count] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          long value = (long) row[count];
-          long maxVal = (long) max[count];
-          long minVal = (long) min[count];
-          max[count] = (maxVal > value ? max[count] : value);
-          min[count] = (minVal < value ? min[count] : value);
-        } else if (type[count] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          byte[] buff = null;
-          // in compaction flow the measure with decimal type will come as spark decimal.
-          // need to convert it to byte array.
-          if (this.compactionFlow) {
-            BigDecimal bigDecimal = ((Decimal) row[count]).toJavaBigDecimal();
-            buff = DataTypeUtil.bigDecimalToByte(bigDecimal);
-          } else {
-            buff = (byte[]) row[count];
-          }
-          BigDecimal value = DataTypeUtil.byteToBigDecimal(buff);
-          decimal[count] = value.scale();
-        }
-      }
-    }
-  }
-
-  /**
-   * This method will calculate the unique value which will be used as storage
-   * key for null values of measures
-   *
-   * @param minValue
-   * @param uniqueValue
-   */
-  private void calculateUniqueValue(Object[] minValue, Object[] uniqueValue) {
-    for (int i = 0; i < measureCount; i++) {
-      if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        uniqueValue[i] = (long) minValue[i] - 1;
-      } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        BigDecimal val = (BigDecimal) minValue[i];
-        uniqueValue[i] = (val.subtract(new BigDecimal(1.0)));
-      } else {
-        uniqueValue[i] = (double) minValue[i] - 1;
-      }
-    }
-  }
-
-  /**
    * Below method will be to configure fact file writing configuration
    *
    * @throws CarbonDataWriterException
@@ -1121,15 +882,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     List<Integer> customMeasureIndexList =
         new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (int j = 0; j < type.length; j++) {
-      if (type[j] != CarbonCommonConstants.BYTE_VALUE_MEASURE
-          && type[j] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+      if (type[j] != DataType.BYTE && type[j] != DataType.DECIMAL) {
         otherMeasureIndexList.add(j);
       } else {
         customMeasureIndexList.add(j);
       }
     }
-    otherMeasureIndex = new int[otherMeasureIndexList.size()];
-    customMeasureIndex = new int[customMeasureIndexList.size()];
+
+    int[] otherMeasureIndex = new int[otherMeasureIndexList.size()];
+    int[] customMeasureIndex = new int[customMeasureIndexList.size()];
     for (int i = 0; i < otherMeasureIndex.length; i++) {
       otherMeasureIndex[i] = otherMeasureIndexList.get(i);
     }
@@ -1156,7 +917,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * @return all dimensions cardinality including complex dimension metadata column
    */
   private int[] getBlockKeySizeWithComplexTypes(int[] primitiveBlockKeySize) {
-    int allColsCount = getComplexColsCount();
+    int allColsCount = getExpandedComplexColsCount();
     int[] blockKeySizeWithComplexTypes =
         new int[this.colGrpModel.getNoOfColumnStore() + allColsCount];
 
@@ -1177,52 +938,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return blockKeySizeWithComplexTypes;
   }
 
-  private CarbonWriteDataHolder initialiseKeyBlockHolder(int size) {
-    CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
-    keyDataHolder.initialiseByteArrayValuesForKey(size);
-    return keyDataHolder;
-  }
-
-  private CarbonWriteDataHolder initialiseKeyBlockHolderForNonDictionary(int size) {
-    CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
-    keyDataHolder.initialiseByteArrayValuesForNonDictionary(size);
-    return keyDataHolder;
-  }
-
-  private CarbonWriteDataHolder[] initialiseDataHolder(int size) {
-    CarbonWriteDataHolder[] dataHolder = new CarbonWriteDataHolder[this.measureCount];
-    for (int i = 0; i < otherMeasureIndex.length; i++) {
-      dataHolder[otherMeasureIndex[i]] = new CarbonWriteDataHolder();
-      if (type[otherMeasureIndex[i]] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        dataHolder[otherMeasureIndex[i]].initialiseLongValues(size);
-      } else {
-        dataHolder[otherMeasureIndex[i]].initialiseDoubleValues(size);
-      }
-    }
-    for (int i = 0; i < customMeasureIndex.length; i++) {
-      dataHolder[customMeasureIndex[i]] = new CarbonWriteDataHolder();
-      dataHolder[customMeasureIndex[i]].initialiseByteArrayValues(size);
-    }
-    return dataHolder;
-  }
-
-  /**
-   * Below method will be used to get the bit set array for
-   * all the measure, which will store the indexes which are null
-   *
-   * @param measureCount
-   * @return bit set to store null value index
-   */
-  private BitSet[] getMeasureNullValueIndexBitSet(int measureCount) {
-    // creating a bit set for all the measure column
-    BitSet[] nullvalueIndexBitset = new BitSet[measureCount];
-    for (int i = 0; i < nullvalueIndexBitset.length; i++) {
-      // bitset size will be blocklet size
-      nullvalueIndexBitset[i] = new BitSet(this.blockletSize);
-    }
-    return nullvalueIndexBitset;
-  }
-
   /**
    * Below method will be used to get the fact data writer instance
    *
@@ -1254,7 +969,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     carbonDataWriterVo.setCarbonDataFileAttributes(carbonDataFileAttributes);
     carbonDataWriterVo.setDatabaseName(databaseName);
     carbonDataWriterVo.setWrapperColumnSchemaList(wrapperColumnSchemaList);
-    carbonDataWriterVo.setIsDictionaryColumn(dimensionType);
+    carbonDataWriterVo.setIsDictionaryColumn(isDictDimension);
     carbonDataWriterVo.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     carbonDataWriterVo.setColCardinality(colCardinality);
     carbonDataWriterVo.setSegmentProperties(segmentProperties);
@@ -1481,13 +1196,185 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         }
       } else {
         if (version == ColumnarFormatVersion.V3) {
-          return new BlockIndexerStorageForNoInvertedIndexForShort(this.data, isNoDictionary);
+          return new BlockIndexerStorageForNoInvertedIndexForShort(this.data);
+        } else {
+          return new BlockIndexerStorageForNoInvertedIndex(this.data);
+        }
+      }
+
+    }
+
+  }
+
+  public class Codec {
+    private WriterCompressModel compressionModel;
+    private byte[][] encodedMeasureArray;
+
+    Codec() {
+    }
+
+    public WriterCompressModel getCompressionModel() {
+      return compressionModel;
+    }
+
+    public byte[][] getEncodedMeasure() {
+      return encodedMeasureArray;
+    }
+
+    public Codec encodeAndCompressMeasures(TablePage tablePage) {
+      // TODO: following conversion is required only because compress model requires them,
+      // remove then after the compress framework is refactoried
+      FixLengthColumnPage[] measurePage = tablePage.measurePage;
+      int measureCount = measurePage.length;
+      Object[] min = new Object[measurePage.length];
+      Object[] max = new Object[measurePage.length];
+      Object[] uniqueValue = new Object[measurePage.length];
+      int[] decimal = new int[measurePage.length];
+      for (int i = 0; i < measurePage.length; i++) {
+        min[i] = measurePage[i].getStatistics().getMin();
+        max[i] = measurePage[i].getStatistics().getMax();
+        uniqueValue[i] = measurePage[i].getStatistics().getUniqueValue();
+        decimal[i] = measurePage[i].getStatistics().getDecimal();
+      }
+      // encode and compress measure column page
+      compressionModel = ValueCompressionUtil
+          .getWriterCompressModel(max, min, decimal, uniqueValue, type, new byte[measureCount]);
+      encodedMeasureArray = encodeMeasure(compressionModel, measurePage);
+      return this;
+    }
+
+    // this method first invokes encoding routine to encode the data chunk,
+    // followed by invoking compression routine for preparing the data chunk for writing.
+    private byte[][] encodeMeasure(WriterCompressModel compressionModel,
+        FixLengthColumnPage[] columnPages) {
+
+      CarbonWriteDataHolder[] holders = new CarbonWriteDataHolder[columnPages.length];
+      for (int i = 0; i < holders.length; i++) {
+        holders[i] = new CarbonWriteDataHolder();
+        switch (columnPages[i].getDataType()) {
+          case SHORT:
+          case INT:
+          case LONG:
+            holders[i].setWritableLongPage(columnPages[i].getLongPage());
+            break;
+          case DOUBLE:
+            holders[i].setWritableDoublePage(columnPages[i].getDoublePage());
+            break;
+          case DECIMAL:
+            holders[i].setWritableDecimalPage(columnPages[i].getDecimalPage());
+            break;
+          default:
+            throw new RuntimeException("Unsupported data type: " + columnPages[i].getDataType());
+        }
+      }
+
+      DataType[] dataType = compressionModel.getDataType();
+      ValueCompressionHolder[] values =
+          new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
+      byte[][] returnValue = new byte[values.length][];
+      for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
+        values[i] = compressionModel.getValueCompressionHolder()[i];
+        if (dataType[i] != DataType.DECIMAL) {
+          values[i].setValue(
+              ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
+                  .getCompressedValues(compressionModel.getCompressionFinders()[i], holders[i],
+                      compressionModel.getMaxValue()[i],
+                      compressionModel.getMantissa()[i]));
         } else {
-          return new BlockIndexerStorageForNoInvertedIndex(this.data, isNoDictionary);
+          values[i].setValue(holders[i].getWritableByteArrayValues());
         }
+        values[i].compress();
+        returnValue[i] = values[i].getCompressedData();
       }
 
+      return returnValue;
     }
 
+    /**
+     * Encode and compress each column page. The work is done using a thread pool.
+     */
+    private IndexStorage[] encodeAndCompressDimensions(TablePage tablePage) {
+      int noDictionaryCount = tablePage.noDictDimensionPage.length;
+      int complexColCount = tablePage.complexDimensionPage.length;
+
+      // thread pool size to be used for encoding dimension
+      // each thread will sort the column page data and compress it
+      int thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
+              CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL));
+      ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
+      Callable<IndexStorage> callable;
+      List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
+          primitiveDimLens.length + noDictionaryCount + complexColCount);
+      int i = 0;
+      int dictionaryColumnCount = -1;
+      int noDictionaryColumnCount = -1;
+      int colGrpId = -1;
+      boolean isSortColumn = false;
+      for (i = 0; i < isDictDimension.length; i++) {
+        isSortColumn = i < segmentProperties.getNumberOfSortColumns();
+        if (isDictDimension[i]) {
+          dictionaryColumnCount++;
+          if (colGrpModel.isColumnar(dictionaryColumnCount)) {
+            // dictionary dimension
+            callable =
+                new BlockSortThread(
+                    i,
+                    tablePage.dictDimensionPage[dictionaryColumnCount].getByteArrayPage(),
+                    isSortColumn,
+                    isUseInvertedIndex[i] & isSortColumn);
+
+          } else {
+            // column group
+            callable = new ColGroupBlockStorage(
+                segmentProperties,
+                ++colGrpId,
+                tablePage.dictDimensionPage[dictionaryColumnCount].getByteArrayPage());
+          }
+        } else {
+          // no dictionary dimension
+          callable =
+              new BlockSortThread(
+                  i,
+                  tablePage.noDictDimensionPage[++noDictionaryColumnCount].getByteArrayPage(),
+                  false,
+                  true,
+                  isSortColumn,
+                  isUseInvertedIndex[i] & isSortColumn);
+        }
+        // start a thread to sort the page data
+        submit.add(executorService.submit(callable));
+      }
+
+      // complex type column
+      for (int index = 0; index < getComplexColumnCount(); index++) {
+        Iterator<byte[][]> iterator = tablePage.complexDimensionPage[index].iterator();
+        while (iterator.hasNext()) {
+          callable =
+              new BlockSortThread(
+                  i++,
+                  iterator.next(),
+                  false,
+                  true);
+          submit.add(executorService.submit(callable));
+        }
+      }
+      executorService.shutdown();
+      try {
+        executorService.awaitTermination(1, TimeUnit.DAYS);
+      } catch (InterruptedException e) {
+        LOGGER.error(e, e.getMessage());
+      }
+      IndexStorage[] dimColumns = new IndexStorage[
+          colGrpModel.getNoOfColumnStore() + noDictionaryCount + getExpandedComplexColsCount()];
+      try {
+        for (int k = 0; k < dimColumns.length; k++) {
+          dimColumns[k] = submit.get(k).get();
+        }
+      } catch (Exception e) {
+        LOGGER.error(e, e.getMessage());
+      }
+      return dimColumns;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 6fd29d7..44958a4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.processing.store;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -29,13 +28,13 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -146,10 +145,9 @@ public class CarbonFactDataHandlerModel {
   private int[] primitiveDimLens;
 
   /**
-   * array in which each character represents an aggregation type and
-   * the array length will be equal to the number of measures in table
+   * data type of all measures in the table
    */
-  private char[] aggType;
+  private DataType[] measureDataType;
   /**
    * carbon data file attributes like task id, file stamp
    */
@@ -283,8 +281,8 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
     carbonFactDataHandlerModel.setColCardinality(colCardinality);
     carbonFactDataHandlerModel.setDataWritingRequest(true);
-    carbonFactDataHandlerModel.setAggType(CarbonDataProcessorUtil
-        .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields()));
+    carbonFactDataHandlerModel.setMeasureDataType(CarbonDataProcessorUtil
+        .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields()));
     carbonFactDataHandlerModel.setFactDimLens(dimLens);
     carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
     carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen);
@@ -340,13 +338,12 @@ public class CarbonFactDataHandlerModel {
         new HashMap<Integer, GenericDataType>(segmentProperties.getComplexDimensions().size());
     carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
     carbonFactDataHandlerModel.setDataWritingRequest(true);
-    char[] aggType = new char[segmentProperties.getMeasures().size()];
-    Arrays.fill(aggType, 'n');
+    DataType[] aggType = new DataType[segmentProperties.getMeasures().size()];
     int i = 0;
     for (CarbonMeasure msr : segmentProperties.getMeasures()) {
-      aggType[i++] = DataTypeUtil.getAggType(msr.getDataType());
+      aggType[i++] = msr.getDataType();
     }
-    carbonFactDataHandlerModel.setAggType(aggType);
+    carbonFactDataHandlerModel.setMeasureDataType(aggType);
     carbonFactDataHandlerModel.setFactDimLens(segmentProperties.getDimColumnsCardinality());
     String carbonDataDirectoryPath = CarbonDataProcessorUtil
         .checkAndCreateCarbonStoreLocation(loadModel.getStorePath(), loadModel.getDatabaseName(),
@@ -501,12 +498,12 @@ public class CarbonFactDataHandlerModel {
     this.primitiveDimLens = primitiveDimLens;
   }
 
-  public char[] getAggType() {
-    return aggType;
+  public DataType[] getMeasureDataType() {
+    return measureDataType;
   }
 
-  public void setAggType(char[] aggType) {
-    this.aggType = aggType;
+  public void setMeasureDataType(DataType[] measureDataType) {
+    this.measureDataType = measureDataType;
   }
 
   public String getCarbonDataDirectoryPath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index f8454f1..6a33f34 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortTempFileChunkHolder;
@@ -93,7 +94,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    */
   private String tempFileLocation;
 
-  private char[] aggType;
+  private DataType[] measureDataType;
 
   /**
    * below code is to check whether dimension
@@ -105,13 +106,13 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
 
   public SingleThreadFinalSortFilesMerger(String tempFileLocation, String tableName,
       int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
-      char[] aggType, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
+      DataType[] type, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
     this.tempFileLocation = tempFileLocation;
     this.tableName = tableName;
     this.dimensionCount = dimensionCount;
     this.complexDimensionCount = complexDimensionCount;
     this.measureCount = measureCount;
-    this.aggType = aggType;
+    this.measureDataType = type;
     this.noDictionaryCount = noDictionaryCount;
     this.isNoDictionaryColumn = isNoDictionaryColumn;
     this.isNoDictionarySortColumn = isNoDictionarySortColumn;
@@ -183,8 +184,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
           // create chunk holder
           SortTempFileChunkHolder sortTempFileChunkHolder =
               new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
-                  measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn,
-                  isNoDictionarySortColumn);
+                  measureCount, fileBufferSize, noDictionaryCount, measureDataType,
+                  isNoDictionaryColumn, isNoDictionarySortColumn);
 
           // initialize
           sortTempFileChunkHolder.initialize();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
index 2049bec..def2aaa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
@@ -14,10 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.carbondata.processing.store.colgroup;
 
 import java.util.concurrent.Callable;
 
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 
 /**
@@ -25,8 +27,22 @@ import org.apache.carbondata.core.datastore.columnar.IndexStorage;
  */
 public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage> {
 
+  private byte[][] data;
+
+  private ColGroupMinMax colGrpMinMax;
+
+  public ColGroupBlockStorage(SegmentProperties segmentProperties, int colGrpIndex, byte[][] data) {
+    colGrpMinMax = new ColGroupMinMax(segmentProperties, colGrpIndex);
+    this.data = data;
+    for (int i = 0; i < data.length; i++) {
+      colGrpMinMax.add(data[i]);
+    }
+  }
+
+  @Deprecated
   private ColGroupDataHolder colGrpDataHolder;
 
+  @Deprecated
   public ColGroupBlockStorage(DataHolder colGrpDataHolder) {
     this.colGrpDataHolder = (ColGroupDataHolder) colGrpDataHolder;
   }
@@ -58,7 +74,7 @@ public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage
    * for column group storage its not required
    */
   @Override public byte[][] getKeyBlock() {
-    return colGrpDataHolder.getData();
+    return data;
   }
 
   /**
@@ -73,22 +89,19 @@ public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage
    * for column group storage its not required
    */
   @Override public int getTotalSize() {
-    return colGrpDataHolder.getTotalSize();
+    return data.length;
   }
 
-  /**
-   * Get min max of column group storage
-   */
   @Override public byte[] getMin() {
-    return colGrpDataHolder.getMin();
+    return colGrpMinMax.getMin();
   }
 
   @Override public byte[] getMax() {
-    return colGrpDataHolder.getMax();
+    return colGrpMinMax.getMax();
   }
 
   /**
-   * return
+   * return self
    */
   @Override public IndexStorage call() throws Exception {
     return this;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 6f05b69..85cacd0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -105,10 +105,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
     int[] keyLengths = new int[keyStorageArray.length];
 
     // below will calculate min and max value for each column
-    // for below 2d array, first index will be for column and second will be min
-    // max
+    // for below 2d array, first index will be for column and second will be min and max
     // value for same column
-    // byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
 
     byte[][] dimensionMinValue = new byte[keyStorageArray.length][];
     byte[][] dimensionMaxValue = new byte[keyStorageArray.length][];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index c6eeb4c..3c090fe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.util;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -48,7 +47,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
@@ -406,31 +404,28 @@ public final class CarbonDataProcessorUtil {
     return columnNames;
   }
 
-  /**
-   * get agg type
-   */
-  public static char[] getAggType(int measureCount, String databaseName, String tableName) {
-    char[] aggType = new char[measureCount];
-    Arrays.fill(aggType, 'n');
+
+  public static DataType[] getMeasureDataType(int measureCount, String databaseName,
+      String tableName) {
+    DataType[] type = new DataType[measureCount];
+    for (int i = 0; i < type.length; i++) {
+      type[i] = DataType.DOUBLE;
+    }
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
         databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
     List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName);
-    for (int i = 0; i < aggType.length; i++) {
-      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
+    for (int i = 0; i < type.length; i++) {
+      type[i] = measures.get(i).getDataType();
     }
-    return aggType;
+    return type;
   }
 
-  /**
-   * get agg type
-   */
-  public static char[] getAggType(int measureCount, DataField[] measureFields) {
-    char[] aggType = new char[measureCount];
-    Arrays.fill(aggType, 'n');
-    for (int i = 0; i < measureFields.length; i++) {
-      aggType[i] = DataTypeUtil.getAggType(measureFields[i].getColumn().getDataType());
+  public static DataType[] getMeasureDataType(int measureCount, DataField[] measureFields) {
+    DataType[] type = new DataType[measureCount];
+    for (int i = 0; i < type.length; i++) {
+      type[i] = measureFields[i].getColumn().getDataType();
     }
-    return aggType;
+    return type;
   }
 
   /**
@@ -509,16 +504,19 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
-   * initialise aggregation type for measures for their storage format
+   * initialise data type for measures for their storage format
    */
-  public static char[] initAggType(CarbonTable carbonTable, String tableName, int measureCount) {
-    char[] aggType = new char[measureCount];
-    Arrays.fill(aggType, 'n');
+  public static DataType[] initDataType(CarbonTable carbonTable, String tableName,
+      int measureCount) {
+    DataType[] type = new DataType[measureCount];
+    for (int i = 0; i < type.length; i++) {
+      type[i] = DataType.DOUBLE;
+    }
     List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName);
     for (int i = 0; i < measureCount; i++) {
-      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
+      type[i] = measures.get(i).getDataType();
     }
-    return aggType;
+    return type;
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/test/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java b/processing/src/test/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
index da10d7a..038ac03 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -159,6 +160,7 @@ public class ColGroupMinMaxTest {
 		}
 	}
 
+	@Ignore
 	@Test
 	public void testRowStoreMinMax() throws KeyGenException {
 


[5/5] carbondata git commit: [CARBONDATA-1015] Refactory write step and add ColumnPage in data load This closes #852

Posted by ja...@apache.org.
[CARBONDATA-1015] Refactory write step and add ColumnPage in data load This closes #852


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a161db4e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a161db4e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a161db4e

Branch: refs/heads/12-dev
Commit: a161db4e254c0d62988287ef928add0cb26194c7
Parents: b72a90e a79a3a1
Author: jackylk <ja...@huawei.com>
Authored: Wed May 10 09:12:07 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Wed May 10 09:12:07 2017 +0800

----------------------------------------------------------------------
 .../core/compression/BigIntCompressor.java      |   9 +-
 .../core/compression/DoubleCompressor.java      |  43 +-
 .../core/compression/ValueCompressor.java       |   3 +-
 .../core/constants/CarbonCommonConstants.java   |   6 -
 .../chunk/store/MeasureChunkStoreFactory.java   |  26 +-
 .../BlockIndexerStorageForNoInvertedIndex.java  |   2 +-
 ...ndexerStorageForNoInvertedIndexForShort.java |   3 +-
 .../compression/MeasureMetaDataModel.java       |   8 +-
 .../compression/ReaderCompressModel.java        |   8 +-
 .../compression/ValueCompressionHolder.java     |  26 +-
 .../compression/WriterCompressModel.java        |  41 +-
 .../compression/decimal/CompressByteArray.java  |   4 +-
 .../decimal/CompressionMaxMinByte.java          |   6 +-
 .../decimal/CompressionMaxMinDefault.java       |   6 +-
 .../decimal/CompressionMaxMinInt.java           |   6 +-
 .../decimal/CompressionMaxMinLong.java          |   6 +-
 .../decimal/CompressionMaxMinShort.java         |   6 +-
 .../nondecimal/CompressionNonDecimalByte.java   |   6 +-
 .../CompressionNonDecimalDefault.java           |   6 +-
 .../nondecimal/CompressionNonDecimalInt.java    |   6 +-
 .../nondecimal/CompressionNonDecimalLong.java   |   6 +-
 .../CompressionNonDecimalMaxMinByte.java        |   6 +-
 .../CompressionNonDecimalMaxMinDefault.java     |   6 +-
 .../CompressionNonDecimalMaxMinInt.java         |   6 +-
 .../CompressionNonDecimalMaxMinLong.java        |   6 +-
 .../CompressionNonDecimalMaxMinShort.java       |   6 +-
 .../nondecimal/CompressionNonDecimalShort.java  |   6 +-
 .../compression/none/CompressionNoneByte.java   |   6 +-
 .../none/CompressionNoneDefault.java            |   6 +-
 .../compression/none/CompressionNoneInt.java    |   6 +-
 .../compression/none/CompressionNoneLong.java   |   6 +-
 .../compression/none/CompressionNoneShort.java  |   6 +-
 .../dataholder/CarbonWriteDataHolder.java       | 156 +--
 .../HeavyCompressedDoubleArrayDataStore.java    |  57 --
 .../core/datastore/page/ColumnPage.java         |  42 +
 .../core/datastore/page/ComplexColumnPage.java  |  77 ++
 .../datastore/page/FixLengthColumnPage.java     | 155 +++
 .../datastore/page/VarLengthColumnPage.java     |  42 +
 .../datastore/page/compression/Compression.java |  23 +
 .../datastore/page/encoding/ColumnCodec.java    |  35 +
 .../datastore/page/encoding/DummyCodec.java     |  37 +
 .../page/statistics/PageStatistics.java         | 124 +++
 .../page/statistics/StatisticsCollector.java    |  66 ++
 .../core/metadata/ValueEncoderMeta.java         |  18 +-
 .../core/metadata/datatype/DataType.java        |   6 +-
 .../apache/carbondata/core/util/ByteUtil.java   |   2 +
 .../core/util/CarbonMetadataUtil.java           |  44 +-
 .../carbondata/core/util/CarbonProperties.java  |   1 -
 .../apache/carbondata/core/util/CarbonUtil.java |   2 +-
 .../carbondata/core/util/CompressionFinder.java |  10 +-
 .../carbondata/core/util/DataTypeUtil.java      |   3 -
 .../apache/carbondata/core/util/NodeHolder.java |  31 +-
 .../core/util/ValueCompressionUtil.java         | 190 ++--
 .../core/util/CarbonMetadataUtilTest.java       |  11 +-
 .../core/util/ValueCompressionUtilTest.java     | 114 +--
 .../core/writer/CarbonFooterWriterTest.java     |   2 +-
 .../examples/CarbonSessionExample.scala         |   1 +
 format/src/main/thrift/carbondata.thrift        |   8 +-
 .../testsuite/emptyrow/TestEmptyRows.scala      |   2 +-
 .../dataload/TestLoadDataWithNoMeasure.scala    |   8 +-
 .../ColumnGroupDataTypesTestCase.scala          |   6 +
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   2 +-
 .../merger/CompactionResultSortProcessor.java   |  17 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   2 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |   2 +-
 .../sort/unsafe/UnsafeCarbonRowPage.java        | 133 +--
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |  10 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   |  29 +-
 .../merger/UnsafeIntermediateFileMerger.java    |  38 +-
 .../CarbonRowDataWriterProcessorStepImpl.java   |  10 +-
 .../sortdata/IntermediateFileMerger.java        |  37 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |  29 +-
 .../sortandgroupby/sortdata/SortParameters.java |  25 +-
 .../sortdata/SortTempFileChunkHolder.java       |  29 +-
 .../store/CarbonFactDataHandlerColumnar.java    | 975 ++++++++-----------
 .../store/CarbonFactDataHandlerModel.java       |  27 +-
 .../store/SingleThreadFinalSortFilesMerger.java |  11 +-
 .../store/colgroup/ColGroupBlockStorage.java    |  29 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |   4 +-
 .../util/CarbonDataProcessorUtil.java           |  50 +-
 .../store/colgroup/ColGroupMinMaxTest.java      |   2 +
 81 files changed, 1727 insertions(+), 1306 deletions(-)
----------------------------------------------------------------------



[2/5] carbondata git commit: refactor write step based on ColumnPage

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 059c734..a515f0b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -141,7 +141,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
         match {
           case parser.Success(field, _) => field.asInstanceOf[Field]
           case failureOrError => throw new MalformedCarbonCommandException(
-            s"Unsupported data type: $col.getType")
+            s"Unsupported data type: $col.getDataType")
         }
         // the data type of the decimal type will be like decimal(10,0)
         // so checking the start of the string and taking the precision and scale.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 81ee408..690f6ef 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -43,6 +44,8 @@ import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
+import org.apache.spark.sql.types.Decimal;
+
 /**
  * This class will process the query result and convert the data
  * into a format compatible for data load
@@ -89,7 +92,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
   /**
    * agg type defined for measures
    */
-  private char[] aggType;
+  private DataType[] aggType;
   /**
    * segment id
    */
@@ -243,14 +246,14 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * This method will convert the spark decimal to java big decimal type
    *
    * @param value
-   * @param aggType
+   * @param type
    * @return
    */
-  private Object getConvertedMeasureValue(Object value, char aggType) {
-    switch (aggType) {
-      case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+  private Object getConvertedMeasureValue(Object value, DataType type) {
+    switch (type) {
+      case DECIMAL:
         if (value != null) {
-          value = ((org.apache.spark.sql.types.Decimal) value).toJavaBigDecimal();
+          value = ((Decimal) value).toJavaBigDecimal();
         }
         return value;
       default:
@@ -404,6 +407,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * initialise aggregation type for measures for their storage format
    */
   private void initAggType() {
-    aggType = CarbonDataProcessorUtil.initAggType(carbonTable, tableName, measureCount);
+    aggType = CarbonDataProcessorUtil.initDataType(carbonTable, tableName, measureCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 0e14660..c1aafcd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -82,7 +82,7 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
         new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
             sortParameters.getDimColCount(),
             sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
-            sortParameters.getNoDictionaryCount(), sortParameters.getAggType(),
+            sortParameters.getNoDictionaryCount(), sortParameters.getMeasureDataType(),
             sortParameters.getNoDictionaryDimnesionColumn(),
             sortParameters.getNoDictionarySortColumn());
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 60231c5..c8977ac 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -142,7 +142,7 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
     return new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
             sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(),
             sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(),
-            sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn(),
+            sortParameters.getMeasureDataType(), sortParameters.getNoDictionaryDimnesionColumn(),
             this.sortParameters.getNoDictionarySortColumn());
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index 44f11f7..24109e4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -22,9 +22,9 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Arrays;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
@@ -40,7 +40,7 @@ public class UnsafeCarbonRowPage {
 
   private int measureSize;
 
-  private char[] aggType;
+  private DataType[] measureDataType;
 
   private long[] nullSetWords;
 
@@ -55,13 +55,13 @@ public class UnsafeCarbonRowPage {
   private boolean saveToDisk;
 
   public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
-      boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, char[] aggType,
+      boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
       MemoryBlock memoryBlock, boolean saveToDisk) {
     this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
     this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
     this.dimensionSize = dimensionSize;
     this.measureSize = measureSize;
-    this.aggType = aggType;
+    this.measureDataType = type;
     this.saveToDisk = saveToDisk;
     this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
     buffer = new IntPointerBuffer(memoryBlock);
@@ -116,24 +116,30 @@ public class UnsafeCarbonRowPage {
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
-        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          Double val = (Double) value;
-          CarbonUnsafe.unsafe.putDouble(baseObject, address + size, val);
-          size += 8;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          Long val = (Long) value;
-          CarbonUnsafe.unsafe.putLong(baseObject, address + size, val);
-          size += 8;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          BigDecimal val = (BigDecimal) value;
-          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
-          CarbonUnsafe.unsafe.putShort(baseObject, address + size,
-              (short) bigDecimalInBytes.length);
-          size += 2;
-          CarbonUnsafe.unsafe
-              .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
-                  address + size, bigDecimalInBytes.length);
-          size += bigDecimalInBytes.length;
+        switch (measureDataType[mesCount]) {
+          case SHORT:
+          case INT:
+          case LONG:
+            Long val = (Long) value;
+            CarbonUnsafe.unsafe.putLong(baseObject, address + size, val);
+            size += 8;
+            break;
+          case DOUBLE:
+            Double doubleVal = (Double) value;
+            CarbonUnsafe.unsafe.putDouble(baseObject, address + size, doubleVal);
+            size += 8;
+            break;
+          case DECIMAL:
+            BigDecimal decimalVal = (BigDecimal) value;
+            byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
+            CarbonUnsafe.unsafe.putShort(baseObject, address + size,
+                (short) bigDecimalInBytes.length);
+            size += 2;
+            CarbonUnsafe.unsafe
+                .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
+                    address + size, bigDecimalInBytes.length);
+            size += bigDecimalInBytes.length;
+            break;
         }
         set(nullSetWords, mesCount);
       } else {
@@ -187,22 +193,28 @@ public class UnsafeCarbonRowPage {
 
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       if (isSet(nullSetWords, mesCount)) {
-        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          Double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
-          size += 8;
-          rowToFill[dimensionSize + mesCount] = val;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          Long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
-          size += 8;
-          rowToFill[dimensionSize + mesCount] = val;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
-          byte[] bigDecimalInBytes = new byte[aShort];
-          size += 2;
-          CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
-              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
-          size += bigDecimalInBytes.length;
-          rowToFill[dimensionSize + mesCount] = bigDecimalInBytes;
+        switch (measureDataType[mesCount]) {
+          case SHORT:
+          case INT:
+          case LONG:
+            Long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
+            size += 8;
+            rowToFill[dimensionSize + mesCount] = val;
+            break;
+          case DOUBLE:
+            Double doubleVal = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
+            size += 8;
+            rowToFill[dimensionSize + mesCount] = doubleVal;
+            break;
+          case DECIMAL:
+            short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+            byte[] bigDecimalInBytes = new byte[aShort];
+            size += 2;
+            CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
+                CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+            size += bigDecimalInBytes.length;
+            rowToFill[dimensionSize + mesCount] = bigDecimalInBytes;
+            break;
         }
       } else {
         rowToFill[dimensionSize + mesCount] = null;
@@ -258,33 +270,34 @@ public class UnsafeCarbonRowPage {
 
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       if (isSet(nullSetWords, mesCount)) {
-        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          double val = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
-          size += 8;
-          stream.writeDouble(val);
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
-          size += 8;
-          stream.writeLong(val);
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
-          byte[] bigDecimalInBytes = new byte[aShort];
-          size += 2;
-          CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
-              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
-          size += bigDecimalInBytes.length;
-          stream.writeShort(aShort);
-          stream.write(bigDecimalInBytes);
+        switch (measureDataType[mesCount]) {
+          case SHORT:
+          case INT:
+          case LONG:
+            long val = CarbonUnsafe.unsafe.getLong(baseObject, address + size);
+            size += 8;
+            stream.writeLong(val);
+            break;
+          case DOUBLE:
+            double doubleVal = CarbonUnsafe.unsafe.getDouble(baseObject, address + size);
+            size += 8;
+            stream.writeDouble(doubleVal);
+            break;
+          case DECIMAL:
+            short aShort = CarbonUnsafe.unsafe.getShort(baseObject, address + size);
+            byte[] bigDecimalInBytes = new byte[aShort];
+            size += 2;
+            CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes,
+                CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
+            size += bigDecimalInBytes.length;
+            stream.writeShort(aShort);
+            stream.write(bigDecimalInBytes);
+            break;
         }
       }
     }
   }
 
-  private Object[] getRow(long address) {
-    Object[] row = new Object[dimensionSize + measureSize];
-    return getRow(address, row);
-  }
-
   public void freeMemory() {
     buffer.freeMemory();
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 40608fa..a9c0cb7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -115,7 +115,7 @@ public class UnsafeSortDataRows {
     this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
         parameters.getNoDictionarySortColumn(),
         parameters.getDimColCount() + parameters.getComplexDimColCount(),
-        parameters.getMeasureColCount(), parameters.getAggType(), baseBlock,
+        parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
         !UnsafeMemoryManager.INSTANCE.isMemoryAvailable());
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
@@ -178,10 +178,14 @@ public class UnsafeSortDataRows {
             dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
             MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
             boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
-            rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
+            rowPage = new UnsafeCarbonRowPage(
+                parameters.getNoDictionaryDimnesionColumn(),
                 parameters.getNoDictionarySortColumn(),
                 parameters.getDimColCount() + parameters.getComplexDimColCount(),
-                parameters.getMeasureColCount(), parameters.getAggType(), memoryBlock, saveToDisk);
+                parameters.getMeasureColCount(),
+                parameters.getMeasureDataType(),
+                memoryBlock,
+                saveToDisk);
             bytesAdded += rowPage.addRow(rowBatch[i]);
           } catch (Exception e) {
             LOGGER.error(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index cfdb69a..aee4e51 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Future;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
@@ -122,7 +123,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
   private int noDictionaryCount;
 
-  private char[] aggType;
+  private DataType[] measureDataType;
 
   private int numberOfObjectRead;
   /**
@@ -150,7 +151,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     // set mdkey length
     this.fileBufferSize = parameters.getFileBufferSize();
     this.executorService = Executors.newFixedThreadPool(1);
-    this.aggType = parameters.getAggType();
+    this.measureDataType = parameters.getMeasureDataType();
     this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
     this.nullSetWordsLength = ((measureCount - 1) >> 6) + 1;
     comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
@@ -323,15 +324,21 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
       for (int mesCount = 0; mesCount < measureCount; mesCount++) {
         if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
-          if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-            row[dimensionCount + mesCount] = stream.readDouble();
-          } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-            row[dimensionCount + mesCount] = stream.readLong();
-          } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-            short aShort = stream.readShort();
-            byte[] bigDecimalInBytes = new byte[aShort];
-            stream.readFully(bigDecimalInBytes);
-            row[dimensionCount + mesCount] = bigDecimalInBytes;
+          switch (measureDataType[mesCount]) {
+            case SHORT:
+            case INT:
+            case LONG:
+              row[dimensionCount + mesCount] = stream.readLong();
+              break;
+            case DOUBLE:
+              row[dimensionCount + mesCount] = stream.readDouble();
+              break;
+            case DECIMAL:
+              short aShort = stream.readShort();
+              byte[] bigDecimalInBytes = new byte[aShort];
+              stream.readFully(bigDecimalInBytes);
+              row[dimensionCount + mesCount] = bigDecimalInBytes;
+              break;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index e52dc8a..90c3b69 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -31,7 +31,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder;
@@ -278,7 +278,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
   private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
     int dimCount = 0;
     int size = 0;
-    char[] aggType = mergerParameters.getAggType();
+    DataType[] type = mergerParameters.getMeasureDataType();
     for (; dimCount < noDictionarycolumnMapping.length; dimCount++) {
       if (noDictionarycolumnMapping[dimCount]) {
         byte[] col = (byte[]) row[dimCount];
@@ -310,21 +310,25 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     for (int mesCount = 0; mesCount < measureSize; mesCount++) {
       Object value = row[mesCount + dimensionSize];
       if (null != value) {
-        if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          Double val = (Double) value;
-          rowData.putDouble(size, val);
-          size += 8;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          Long val = (Long) value;
-          rowData.putLong(size, val);
-          size += 8;
-        } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          byte[] bigDecimalInBytes = (byte[]) value;
-          rowData.putShort(size, (short)bigDecimalInBytes.length);
-          size += 2;
-          for (int i = 0; i < bigDecimalInBytes.length; i++) {
-            rowData.put(size++, bigDecimalInBytes[i]);
-          }
+        switch (type[mesCount]) {
+          case SHORT:
+          case INT:
+          case LONG:
+            rowData.putLong(size, (Long) value);
+            size += 8;
+            break;
+          case DOUBLE:
+            rowData.putDouble(size, (Double) value);
+            size += 8;
+            break;
+          case DECIMAL:
+            byte[] bigDecimalInBytes = (byte[]) value;
+            rowData.putShort(size, (short)bigDecimalInBytes.length);
+            size += 2;
+            for (int i = 0; i < bigDecimalInBytes.length; i++) {
+              rowData.put(size++, bigDecimalInBytes[i]);
+            }
+            break;
         }
         UnsafeCarbonRowPage.set(nullSetWords, mesCount);
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
index c50f335..0f0a5b0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -26,11 +26,11 @@ import java.util.concurrent.Future;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
@@ -64,7 +64,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
 
   private boolean[] isNoDictionaryDimensionColumn;
 
-  private char[] aggType;
+  private DataType[] measureDataType;
 
   private int dimensionCount;
 
@@ -115,8 +115,8 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       dimensionCount = configuration.getDimensionCount() - noDictWithComplextCount;
       isNoDictionaryDimensionColumn =
           CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
-      aggType = CarbonDataProcessorUtil
-          .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields());
+      measureDataType = CarbonDataProcessorUtil
+          .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields());
 
       CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel
           .createCarbonFactDataHandlerModel(configuration,
@@ -266,7 +266,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     for (; l < this.measureCount; l++) {
       Object value = row.getObject(l + this.dimensionWithComplexCount);
       if (null != value) {
-        if (aggType[l] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+        if (measureDataType[l] == DataType.DECIMAL) {
           BigDecimal val = (BigDecimal) value;
           outputRow[l] = DataTypeUtil.bigDecimalToByte(val);
         } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index a9e762d..d20292c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -29,7 +29,7 @@ import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.NonDictionaryUtil;
@@ -251,7 +251,8 @@ public class IntermediateFileMerger implements Callable<Void> {
           new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
               mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
               mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn(),
+              mergerParameters.getMeasureDataType(),
+              mergerParameters.getNoDictionaryDimnesionColumn(),
               mergerParameters.getNoDictionarySortColumn());
 
       // initialize
@@ -319,7 +320,7 @@ public class IntermediateFileMerger implements Callable<Void> {
       return;
     }
     try {
-      char[] aggType = mergerParameters.getAggType();
+      DataType[] aggType = mergerParameters.getMeasureDataType();
       int[] mdkArray = (int[]) row[0];
       byte[][] nonDictArray = (byte[][]) row[1];
       int mdkIndex = 0;
@@ -339,27 +340,27 @@ public class IntermediateFileMerger implements Callable<Void> {
       for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
         if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
           stream.write((byte) 1);
-          if (aggType[counter] == CarbonCommonConstants.BYTE_VALUE_MEASURE) {
-            Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
-            stream.writeDouble(val);
-          } else if (aggType[counter] == CarbonCommonConstants.DOUBLE_MEASURE) {
-            Double val = (Double) NonDictionaryUtil.getMeasure(fieldIndex, row);
-            stream.writeDouble(val);
-          } else if (aggType[counter] == CarbonCommonConstants.BIG_INT_MEASURE) {
-            Long val = (Long) NonDictionaryUtil.getMeasure(fieldIndex, row);
-            stream.writeLong(val);
-          } else if (aggType[counter] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-            byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
-            stream.writeInt(bigDecimalInBytes.length);
-            stream.write(bigDecimalInBytes);
+          switch (aggType[counter]) {
+            case SHORT:
+            case INT:
+            case LONG:
+              Long val = (Long) NonDictionaryUtil.getMeasure(fieldIndex, row);
+              stream.writeLong(val);
+              break;
+            case DOUBLE:
+              stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
+              break;
+            case DECIMAL:
+              byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
+              stream.writeInt(bigDecimalInBytes.length);
+              stream.write(bigDecimalInBytes);
+              break;
           }
         } else {
           stream.write((byte) 0);
         }
-
         fieldIndex++;
       }
-
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index eba5433..af654e2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -256,7 +257,7 @@ public class SortDataRows {
       stream.writeInt(entryCountLocal);
       int complexDimColCount = parameters.getComplexDimColCount();
       int dimColCount = parameters.getDimColCount() + complexDimColCount;
-      char[] aggType = parameters.getAggType();
+      DataType[] type = parameters.getMeasureDataType();
       boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
       Object[] row = null;
       for (int i = 0; i < entryCountLocal; i++) {
@@ -285,17 +286,21 @@ public class SortDataRows {
           Object value = row[mesCount + dimColCount];
           if (null != value) {
             stream.write((byte) 1);
-            if (aggType[mesCount] == CarbonCommonConstants.DOUBLE_MEASURE) {
-              Double val = (Double) value;
-              stream.writeDouble(val);
-            } else if (aggType[mesCount] == CarbonCommonConstants.BIG_INT_MEASURE) {
-              Long val = (Long) value;
-              stream.writeLong(val);
-            } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-              BigDecimal val = (BigDecimal) value;
-              byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
-              stream.writeInt(bigDecimalInBytes.length);
-              stream.write(bigDecimalInBytes);
+            switch (type[mesCount]) {
+              case SHORT:
+              case INT:
+              case LONG:
+                stream.writeLong((Long) value);
+                break;
+              case DOUBLE:
+                stream.writeDouble((Double) value);
+                break;
+              case DECIMAL:
+                BigDecimal val = (BigDecimal) value;
+                byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
+                stream.writeInt(bigDecimalInBytes.length);
+                stream.write(bigDecimalInBytes);
+                break;
             }
           } else {
             stream.write((byte) 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 7ef8f8e..8ac1491 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -22,6 +22,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.schema.metadata.SortObserver;
@@ -88,7 +89,7 @@ public class SortParameters {
 
   private String tableName;
 
-  private char[] aggType;
+  private DataType[] measureDataType;
 
   /**
    * To know how many columns are of high cardinality.
@@ -137,7 +138,7 @@ public class SortParameters {
     parameters.bufferSize = bufferSize;
     parameters.databaseName = databaseName;
     parameters.tableName = tableName;
-    parameters.aggType = aggType;
+    parameters.measureDataType = measureDataType;
     parameters.noDictionaryCount = noDictionaryCount;
     parameters.partitionID = partitionID;
     parameters.segmentId = segmentId;
@@ -270,12 +271,12 @@ public class SortParameters {
     this.tableName = tableName;
   }
 
-  public char[] getAggType() {
-    return aggType;
+  public DataType[] getMeasureDataType() {
+    return measureDataType;
   }
 
-  public void setAggType(char[] aggType) {
-    this.aggType = aggType;
+  public void setMeasureDataType(DataType[] measureDataType) {
+    this.measureDataType = measureDataType;
   }
 
   public int getNoDictionaryCount() {
@@ -458,9 +459,9 @@ public class SortParameters {
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
 
-    char[] aggType = CarbonDataProcessorUtil
-        .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields());
-    parameters.setAggType(aggType);
+    DataType[] measureDataType = CarbonDataProcessorUtil
+        .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields());
+    parameters.setMeasureDataType(measureDataType);
     return parameters;
   }
 
@@ -560,10 +561,10 @@ public class SortParameters {
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
 
-    char[] aggType = CarbonDataProcessorUtil
-        .getAggType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
+    DataType[] type = CarbonDataProcessorUtil
+        .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
             parameters.getTableName());
-    parameters.setAggType(aggType);
+    parameters.setMeasureDataType(type);
     return parameters;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index 6695a5b..a4fdec1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Future;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -125,7 +126,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
 
   private int noDictionaryCount;
 
-  private char[] aggType;
+  private DataType[] aggType;
 
   /**
    * to store whether dimension is of dictionary type or not
@@ -150,7 +151,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    * @param isNoDictionaryDimensionColumn
    */
   public SortTempFileChunkHolder(File tempFile, int dimensionCount, int complexDimensionCount,
-      int measureCount, int fileBufferSize, int noDictionaryCount, char[] aggType,
+      int measureCount, int fileBufferSize, int noDictionaryCount, DataType[] aggType,
       boolean[] isNoDictionaryDimensionColumn, boolean[] isNoDictionarySortColumn) {
     // set temp file
     this.tempFile = tempFile;
@@ -338,15 +339,21 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
       // read measure values
       for (int i = 0; i < this.measureCount; i++) {
         if (stream.readByte() == 1) {
-          if (aggType[i] == CarbonCommonConstants.DOUBLE_MEASURE) {
-            measures[index++] = stream.readDouble();
-          } else if (aggType[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-            measures[index++] = stream.readLong();
-          } else {
-            int len = stream.readInt();
-            byte[] buff = new byte[len];
-            stream.readFully(buff);
-            measures[index++] = buff;
+          switch (aggType[i]) {
+            case SHORT:
+            case INT:
+            case LONG:
+              measures[index++] = stream.readLong();
+              break;
+            case DOUBLE:
+              measures[index++] = stream.readDouble();
+              break;
+            case DECIMAL:
+              int len = stream.readInt();
+              byte[] buff = new byte[len];
+              stream.readFully(buff);
+              measures[index++] = buff;
+              break;
           }
         } else {
           measures[index++] = null;


[4/5] carbondata git commit: refactor write step based on ColumnPage

Posted by ja...@apache.org.
refactor write step based on ColumnPage


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a79a3a16
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a79a3a16
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a79a3a16

Branch: refs/heads/12-dev
Commit: a79a3a16c1aab00858d5ebb4275453c41f089ad1
Parents: b72a90e
Author: jackylk <ja...@huawei.com>
Authored: Thu May 4 23:32:07 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Wed May 10 01:14:07 2017 +0800

----------------------------------------------------------------------
 .../core/compression/BigIntCompressor.java      |   9 +-
 .../core/compression/DoubleCompressor.java      |  43 +-
 .../core/compression/ValueCompressor.java       |   3 +-
 .../core/constants/CarbonCommonConstants.java   |   6 -
 .../chunk/store/MeasureChunkStoreFactory.java   |  26 +-
 .../BlockIndexerStorageForNoInvertedIndex.java  |   2 +-
 ...ndexerStorageForNoInvertedIndexForShort.java |   3 +-
 .../compression/MeasureMetaDataModel.java       |   8 +-
 .../compression/ReaderCompressModel.java        |   8 +-
 .../compression/ValueCompressionHolder.java     |  26 +-
 .../compression/WriterCompressModel.java        |  41 +-
 .../compression/decimal/CompressByteArray.java  |   4 +-
 .../decimal/CompressionMaxMinByte.java          |   6 +-
 .../decimal/CompressionMaxMinDefault.java       |   6 +-
 .../decimal/CompressionMaxMinInt.java           |   6 +-
 .../decimal/CompressionMaxMinLong.java          |   6 +-
 .../decimal/CompressionMaxMinShort.java         |   6 +-
 .../nondecimal/CompressionNonDecimalByte.java   |   6 +-
 .../CompressionNonDecimalDefault.java           |   6 +-
 .../nondecimal/CompressionNonDecimalInt.java    |   6 +-
 .../nondecimal/CompressionNonDecimalLong.java   |   6 +-
 .../CompressionNonDecimalMaxMinByte.java        |   6 +-
 .../CompressionNonDecimalMaxMinDefault.java     |   6 +-
 .../CompressionNonDecimalMaxMinInt.java         |   6 +-
 .../CompressionNonDecimalMaxMinLong.java        |   6 +-
 .../CompressionNonDecimalMaxMinShort.java       |   6 +-
 .../nondecimal/CompressionNonDecimalShort.java  |   6 +-
 .../compression/none/CompressionNoneByte.java   |   6 +-
 .../none/CompressionNoneDefault.java            |   6 +-
 .../compression/none/CompressionNoneInt.java    |   6 +-
 .../compression/none/CompressionNoneLong.java   |   6 +-
 .../compression/none/CompressionNoneShort.java  |   6 +-
 .../dataholder/CarbonWriteDataHolder.java       | 156 +--
 .../HeavyCompressedDoubleArrayDataStore.java    |  57 --
 .../core/datastore/page/ColumnPage.java         |  42 +
 .../core/datastore/page/ComplexColumnPage.java  |  77 ++
 .../datastore/page/FixLengthColumnPage.java     | 155 +++
 .../datastore/page/VarLengthColumnPage.java     |  42 +
 .../datastore/page/compression/Compression.java |  23 +
 .../datastore/page/encoding/ColumnCodec.java    |  35 +
 .../datastore/page/encoding/DummyCodec.java     |  37 +
 .../page/statistics/PageStatistics.java         | 124 +++
 .../page/statistics/StatisticsCollector.java    |  66 ++
 .../core/metadata/ValueEncoderMeta.java         |  18 +-
 .../core/metadata/datatype/DataType.java        |   6 +-
 .../apache/carbondata/core/util/ByteUtil.java   |   2 +
 .../core/util/CarbonMetadataUtil.java           |  44 +-
 .../carbondata/core/util/CarbonProperties.java  |   1 -
 .../apache/carbondata/core/util/CarbonUtil.java |   2 +-
 .../carbondata/core/util/CompressionFinder.java |  10 +-
 .../carbondata/core/util/DataTypeUtil.java      |   3 -
 .../apache/carbondata/core/util/NodeHolder.java |  31 +-
 .../core/util/ValueCompressionUtil.java         | 190 ++--
 .../core/util/CarbonMetadataUtilTest.java       |  11 +-
 .../core/util/ValueCompressionUtilTest.java     | 114 +--
 .../core/writer/CarbonFooterWriterTest.java     |   2 +-
 .../examples/CarbonSessionExample.scala         |   1 +
 format/src/main/thrift/carbondata.thrift        |   8 +-
 .../testsuite/emptyrow/TestEmptyRows.scala      |   2 +-
 .../dataload/TestLoadDataWithNoMeasure.scala    |   8 +-
 .../ColumnGroupDataTypesTestCase.scala          |   6 +
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   2 +-
 .../merger/CompactionResultSortProcessor.java   |  17 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   2 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |   2 +-
 .../sort/unsafe/UnsafeCarbonRowPage.java        | 133 +--
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |  10 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   |  29 +-
 .../merger/UnsafeIntermediateFileMerger.java    |  38 +-
 .../CarbonRowDataWriterProcessorStepImpl.java   |  10 +-
 .../sortdata/IntermediateFileMerger.java        |  37 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |  29 +-
 .../sortandgroupby/sortdata/SortParameters.java |  25 +-
 .../sortdata/SortTempFileChunkHolder.java       |  29 +-
 .../store/CarbonFactDataHandlerColumnar.java    | 975 ++++++++-----------
 .../store/CarbonFactDataHandlerModel.java       |  27 +-
 .../store/SingleThreadFinalSortFilesMerger.java |  11 +-
 .../store/colgroup/ColGroupBlockStorage.java    |  29 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |   4 +-
 .../util/CarbonDataProcessorUtil.java           |  50 +-
 .../store/colgroup/ColGroupMinMaxTest.java      |   2 +
 81 files changed, 1727 insertions(+), 1306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
index 8e43684..8360a68 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
@@ -17,7 +17,7 @@
 package org.apache.carbondata.core.compression;
 
 import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * It compresses big int data
@@ -63,6 +63,7 @@ public class BigIntCompressor extends ValueCompressor {
    */
   @Override
   protected Object compressAdaptive(DataType convertedDataType, CarbonWriteDataHolder dataHolder) {
+
     long[] value = dataHolder.getWritableLongValues();
     return compressValue(convertedDataType, value, 0, false);
   }
@@ -82,7 +83,7 @@ public class BigIntCompressor extends ValueCompressor {
   protected Object compressValue(DataType convertedDataType, long[] value, long maxValue,
       boolean isMinMax) {
     switch (convertedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
         if (isMinMax) {
           for (int j = 0; j < value.length; j++) {
@@ -94,7 +95,7 @@ public class BigIntCompressor extends ValueCompressor {
           }
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
         if (isMinMax) {
           for (int j = 0; j < value.length; j++) {
@@ -106,7 +107,7 @@ public class BigIntCompressor extends ValueCompressor {
           }
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
         if (isMinMax) {
           for (int j = 0; j < value.length; j++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
index 840709d..bc2d6f1 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.compression;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * Double compressor
@@ -33,7 +33,7 @@ public class DoubleCompressor extends ValueCompressor {
     BigDecimal max = BigDecimal.valueOf((double)maxValue);
     double[] value = dataHolder.getWritableDoubleValues();
     switch (convertedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
         for (int j = 0; j < value.length; j++) {
           BigDecimal val = BigDecimal.valueOf(value[j]);
@@ -42,7 +42,7 @@ public class DoubleCompressor extends ValueCompressor {
           i++;
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
         for (int j = 0; j < value.length; j++) {
           BigDecimal val = BigDecimal.valueOf(value[j]);
@@ -51,7 +51,7 @@ public class DoubleCompressor extends ValueCompressor {
           i++;
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
         for (int j = 0; j < value.length; j++) {
           BigDecimal val = BigDecimal.valueOf(value[j]);
@@ -60,7 +60,7 @@ public class DoubleCompressor extends ValueCompressor {
           i++;
         }
         return intResult;
-      case DATA_LONG:
+      case LONG:
         long[] longResult = new long[value.length];
         for (int j = 0; j < value.length; j++) {
           BigDecimal val = BigDecimal.valueOf(value[j]);
@@ -69,7 +69,7 @@ public class DoubleCompressor extends ValueCompressor {
           i++;
         }
         return longResult;
-      case DATA_FLOAT:
+      case FLOAT:
         float[] floatResult = new float[value.length];
         for (int j = 0; j < value.length; j++) {
           BigDecimal val = BigDecimal.valueOf(value[j]);
@@ -96,35 +96,35 @@ public class DoubleCompressor extends ValueCompressor {
     int i = 0;
     double[] value = dataHolder.getWritableDoubleValues();
     switch (convertedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
         for (int j = 0; j < value.length; j++) {
           result[i] = (byte) (Math.round(Math.pow(10, decimal) * value[j]));
           i++;
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
         for (int j = 0; j < value.length; j++) {
           shortResult[i] = (short) (Math.round(Math.pow(10, decimal) * value[j]));
           i++;
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
         for (int j = 0; j < value.length; j++) {
           intResult[i] = (int) (Math.round(Math.pow(10, decimal) * value[j]));
           i++;
         }
         return intResult;
-      case DATA_LONG:
+      case LONG:
         long[] longResult = new long[value.length];
         for (int j = 0; j < value.length; j++) {
           longResult[i] = Math.round(Math.pow(10, decimal) * value[j]);
           i++;
         }
         return longResult;
-      case DATA_FLOAT:
+      case FLOAT:
         float[] floatResult = new float[value.length];
         for (int j = 0; j < value.length; j++) {
           floatResult[i] = (float) (Math.round(Math.pow(10, decimal) * value[j]));
@@ -148,35 +148,35 @@ public class DoubleCompressor extends ValueCompressor {
     double[] value = dataHolder.getWritableDoubleValues();
     int i = 0;
     switch (convertedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
         for (int j = 0; j < value.length; j++) {
           result[i] = (byte) (maxValue - value[j]);
           i++;
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
         for (int j = 0; j < value.length; j++) {
           shortResult[i] = (short) (maxValue - value[j]);
           i++;
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
         for (int j = 0; j < value.length; j++) {
           intResult[i] = (int) (maxValue - value[j]);
           i++;
         }
         return intResult;
-      case DATA_LONG:
+      case LONG:
         long[] longResult = new long[value.length];
         for (int j = 0; j < value.length; j++) {
           longResult[i] = (long) (maxValue - value[j]);
           i++;
         }
         return longResult;
-      case DATA_FLOAT:
+      case FLOAT:
         float[] floatResult = new float[value.length];
         for (int j = 0; j < value.length; j++) {
           floatResult[i] = (float) (maxValue - value[j]);
@@ -198,36 +198,35 @@ public class DoubleCompressor extends ValueCompressor {
     double[] value = dataHolder.getWritableDoubleValues();
     int i = 0;
     switch (changedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
         for (int j = 0; j < value.length; j++) {
           result[i] = (byte) value[j];
           i++;
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
         for (int j = 0; j < value.length; j++) {
           shortResult[i] = (short) value[j];
           i++;
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
         for (int j = 0; j < value.length; j++) {
           intResult[i] = (int) value[j];
           i++;
         }
         return intResult;
-      case DATA_LONG:
-      case DATA_BIGINT:
+      case LONG:
         long[] longResult = new long[value.length];
         for (int j = 0; j < value.length; j++) {
           longResult[i] = (long) value[j];
           i++;
         }
         return longResult;
-      case DATA_FLOAT:
+      case FLOAT:
         float[] floatResult = new float[value.length];
         for (int j = 0; j < value.length; j++) {
           floatResult[i] = (float) value[j];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
index 230507f..16f8ac1 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
@@ -17,10 +17,9 @@
 package org.apache.carbondata.core.compression;
 
 import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CompressionFinder;
 import org.apache.carbondata.core.util.ValueCompressionUtil.COMPRESSION_TYPE;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
-
 /**
  * Measure compressor
  */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 7c59a59..627975a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -647,12 +647,6 @@ public final class CarbonCommonConstants {
   public static final char BIG_INT_MEASURE = 'd';
 
   /**
-   * This determines the size of array to be processed in data load steps. one
-   * for dimensions , one of ignore dictionary dimensions , one for measures.
-   */
-  public static final int ARRAYSIZE = 3;
-
-  /**
    * CARBON_PREFETCH_BUFFERSIZE
    */
   public static final String CARBON_PREFETCH_BUFFERSIZE = "carbon.prefetch.buffersize";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
index 2a3a2a7..12bfea9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
@@ -30,8 +30,8 @@ import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeDouble
 import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeIntMeasureChunkStore;
 import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeLongMeasureChunkStore;
 import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeShortMeasureChunkStore;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 /**
  * Factory class for getting the measure store type
@@ -67,33 +67,33 @@ public class MeasureChunkStoreFactory {
   public MeasureDataChunkStore getMeasureDataChunkStore(DataType dataType, int numberOfRows) {
     if (!isUnsafe) {
       switch (dataType) {
-        case DATA_BYTE:
+        case BYTE:
           return new SafeByteMeasureChunkStore(numberOfRows);
-        case DATA_SHORT:
+        case SHORT:
           return new SafeShortMeasureChunkStore(numberOfRows);
-        case DATA_INT:
+        case INT:
           return new SafeIntMeasureChunkStore(numberOfRows);
-        case DATA_LONG:
+        case LONG:
           return new SafeLongMeasureChunkStore(numberOfRows);
-        case DATA_BIGDECIMAL:
+        case DECIMAL:
           return new SafeBigDecimalMeasureChunkStore(numberOfRows);
-        case DATA_DOUBLE:
+        case DOUBLE:
         default:
           return new SafeDoubleMeasureChunkStore(numberOfRows);
       }
     } else {
       switch (dataType) {
-        case DATA_BYTE:
+        case BYTE:
           return new UnsafeByteMeasureChunkStore(numberOfRows);
-        case DATA_SHORT:
+        case SHORT:
           return new UnsafeShortMeasureChunkStore(numberOfRows);
-        case DATA_INT:
+        case INT:
           return new UnsafeIntMeasureChunkStore(numberOfRows);
-        case DATA_LONG:
+        case LONG:
           return new UnsafeLongMeasureChunkStore(numberOfRows);
-        case DATA_BIGDECIMAL:
+        case DECIMAL:
           return new UnsafeBigDecimalMeasureChunkStore(numberOfRows);
-        case DATA_DOUBLE:
+        case DOUBLE:
         default:
           return new UnsafeDoubleMeasureChunkStore(numberOfRows);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndex.java
index 0ef2518..f36ee97 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndex.java
@@ -37,7 +37,7 @@ public class BlockIndexerStorageForNoInvertedIndex implements IndexStorage<int[]
   private byte[] min;
   private byte[] max;
 
-  public BlockIndexerStorageForNoInvertedIndex(byte[][] keyBlockInput, boolean isNoDictionary) {
+  public BlockIndexerStorageForNoInvertedIndex(byte[][] keyBlockInput) {
     this.keyBlock = keyBlockInput;
     min = keyBlock[0];
     max = keyBlock[0];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
index 731df96..901084f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
@@ -36,8 +36,7 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
   private byte[] min;
   private byte[] max;
 
-  public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] keyBlockInput,
-      boolean isNoDictionary) {
+  public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] keyBlockInput) {
     this.keyBlock = keyBlockInput;
     min = keyBlock[0];
     max = keyBlock[0];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
index 5d35b0d..7a39f7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.compression;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
 public class MeasureMetaDataModel {
   /**
    * maxValue
@@ -46,7 +48,7 @@ public class MeasureMetaDataModel {
   /**
    * type
    */
-  private char[] type;
+  private DataType[] type;
 
   /**
    * dataTypeSelected
@@ -54,7 +56,7 @@ public class MeasureMetaDataModel {
   private byte[] dataTypeSelected;
 
   public MeasureMetaDataModel(Object[] minValue, Object[] maxValue, int[] mantissa,
-      int measureCount, Object[] uniqueValue, char[] type, byte[] dataTypeSelected) {
+      int measureCount, Object[] uniqueValue, DataType[] type, byte[] dataTypeSelected) {
     this.minValue = minValue;
     this.maxValue = maxValue;
     this.mantissa = mantissa;
@@ -112,7 +114,7 @@ public class MeasureMetaDataModel {
   /**
    * @return the type
    */
-  public char[] getType() {
+  public DataType[] getType() {
     return type;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/ReaderCompressModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ReaderCompressModel.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ReaderCompressModel.java
index 101a24b..60687d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ReaderCompressModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ReaderCompressModel.java
@@ -18,13 +18,13 @@
 package org.apache.carbondata.core.datastore.compression;
 
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 // Used in read path for decompression preparation
 public class ReaderCompressModel {
   private ValueEncoderMeta valueEncoderMeta;
 
-  private ValueCompressionUtil.DataType convertedDataType;
+  private DataType convertedDataType;
 
   private ValueCompressionHolder valueHolder;
 
@@ -32,11 +32,11 @@ public class ReaderCompressModel {
     this.valueEncoderMeta = valueEncoderMeta;
   }
 
-  public ValueCompressionUtil.DataType getConvertedDataType() {
+  public DataType getConvertedDataType() {
     return convertedDataType;
   }
 
-  public void setConvertedDataType(ValueCompressionUtil.DataType convertedDataType) {
+  public void setConvertedDataType(DataType convertedDataType) {
     this.convertedDataType = convertedDataType;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
index 9b3a18a..614eb32 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.datastore.compression;
 
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * ValueCompressionHolder is the base class for handling
@@ -40,24 +40,23 @@ public abstract class ValueCompressionHolder<T> {
   protected void unCompress(Compressor compressor, DataType dataType, byte[] data, int offset,
       int length, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     switch (dataType) {
-      case DATA_BYTE:
+      case BYTE:
         setValue((T) compressor.unCompressByte(data, offset, length), numberOfRows, maxValueObject,
             decimalPlaces);
         break;
-      case DATA_SHORT:
+      case SHORT:
         setValue((T) compressor.unCompressShort(data, offset, length), numberOfRows, maxValueObject,
             decimalPlaces);
         break;
-      case DATA_INT:
+      case INT:
         setValue((T) compressor.unCompressInt(data, offset, length), numberOfRows, maxValueObject,
             decimalPlaces);
         break;
-      case DATA_LONG:
-      case DATA_BIGINT:
+      case LONG:
         setValue((T) compressor.unCompressLong(data, offset, length), numberOfRows, maxValueObject,
             decimalPlaces);
         break;
-      case DATA_FLOAT:
+      case FLOAT:
         setValue((T) compressor.unCompressFloat(data, offset, length), numberOfRows, maxValueObject,
             decimalPlaces);
         break;
@@ -75,18 +74,17 @@ public abstract class ValueCompressionHolder<T> {
    */
   public byte[] compress(Compressor compressor, DataType dataType, Object data) {
     switch (dataType) {
-      case DATA_BYTE:
+      case BYTE:
         return compressor.compressByte((byte[]) data);
-      case DATA_SHORT:
+      case SHORT:
         return compressor.compressShort((short[]) data);
-      case DATA_INT:
+      case INT:
         return compressor.compressInt((int[]) data);
-      case DATA_LONG:
-      case DATA_BIGINT:
+      case LONG:
         return compressor.compressLong((long[]) data);
-      case DATA_FLOAT:
+      case FLOAT:
         return compressor.compressFloat((float[]) data);
-      case DATA_DOUBLE:
+      case DOUBLE:
       default:
         return compressor.compressDouble((double[]) data);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
index 0430c8c..a2bf47a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
@@ -17,19 +17,24 @@
 
 package org.apache.carbondata.core.datastore.compression;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CompressionFinder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 
+import static org.apache.carbondata.core.metadata.datatype.DataType.INT;
+import static org.apache.carbondata.core.metadata.datatype.DataType.SHORT;
+
 public class WriterCompressModel {
 
   /**
    * DataType[]  variable.
    */
-  private ValueCompressionUtil.DataType[] convertedDataType;
+  private DataType[] convertedDataType;
   /**
    * DataType[]  variable.
    */
-  private ValueCompressionUtil.DataType[] actualDataType;
+  private DataType[] actualDataType;
 
   /**
    * maxValue
@@ -52,7 +57,7 @@ public class WriterCompressModel {
   /**
    * aggType
    */
-  private char[] type;
+  private DataType[] type;
 
   /**
    * dataTypeSelected
@@ -68,28 +73,28 @@ public class WriterCompressModel {
   /**
    * @return the convertedDataType
    */
-  public ValueCompressionUtil.DataType[] getConvertedDataType() {
+  public DataType[] getConvertedDataType() {
     return convertedDataType;
   }
 
   /**
    * @param convertedDataType the convertedDataType to set
    */
-  public void setConvertedDataType(ValueCompressionUtil.DataType[] convertedDataType) {
+  public void setConvertedDataType(DataType[] convertedDataType) {
     this.convertedDataType = convertedDataType;
   }
 
   /**
    * @return the actualDataType
    */
-  public ValueCompressionUtil.DataType[] getActualDataType() {
+  public DataType[] getActualDataType() {
     return actualDataType;
   }
 
   /**
    * @param actualDataType
    */
-  public void setActualDataType(ValueCompressionUtil.DataType[] actualDataType) {
+  public void setActualDataType(DataType[] actualDataType) {
     this.actualDataType = actualDataType;
   }
 
@@ -159,13 +164,33 @@ public class WriterCompressModel {
    * @return the aggType
    */
   public char[] getType() {
+    char[] ret = new char[type.length];
+    for (int i = 0; i < ret.length; i++) {
+      switch (type[i]) {
+        case SHORT:
+        case INT:
+        case LONG:
+          ret[i] = CarbonCommonConstants.BIG_INT_MEASURE;
+          break;
+        case DOUBLE:
+          ret[i] = CarbonCommonConstants.DOUBLE_MEASURE;
+          break;
+        case DECIMAL:
+          ret[i] = CarbonCommonConstants.BIG_DECIMAL_MEASURE;
+          break;
+      }
+    }
+    return ret;
+  }
+
+  public DataType[] getDataType() {
     return type;
   }
 
   /**
    * @param type the type to set
    */
-  public void setType(char[] type) {
+  public void setType(DataType[] type) {
     this.type = type;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
index e517e41..7a36e66 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class CompressByteArray extends ValueCompressionHolder<byte[]> {
 
@@ -101,7 +101,7 @@ public class CompressByteArray extends ValueCompressionHolder<byte[]> {
   @Override
   public void setValue(byte[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_BIGDECIMAL, numberOfRows);
+        .getMeasureDataChunkStore(DataType.DECIMAL, numberOfRows);
     this.measureChunkStore.putData(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
index ed50cd3..45e3b47 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class CompressionMaxMinByte extends ValueCompressionHolder<byte[]> {
 
@@ -68,7 +68,7 @@ public class CompressionMaxMinByte extends ValueCompressionHolder<byte[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+    compressedValue = super.compress(compressor, DataType.BYTE, value);
   }
 
   @Override public void uncompress(DataType dataType, byte[] compressedData, int offset, int length,
@@ -103,7 +103,7 @@ public class CompressionMaxMinByte extends ValueCompressionHolder<byte[]> {
   @Override
   public void setValue(byte[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.BYTE, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
index 6550c46..6bd1947 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionMaxMinDefault extends ValueCompressionHolder<double[]> {
 
@@ -70,7 +70,7 @@ public class CompressionMaxMinDefault extends ValueCompressionHolder<double[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+    compressedValue = super.compress(compressor, DataType.DOUBLE, value);
   }
 
   @Override public void uncompress(DataType dataType, byte[] compressedData, int offset, int length,
@@ -106,7 +106,7 @@ public class CompressionMaxMinDefault extends ValueCompressionHolder<double[]> {
   @Override
   public void setValue(double[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
index 512cf2c..60ddfea 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionMaxMinInt extends ValueCompressionHolder<int[]> {
   /**
@@ -72,7 +72,7 @@ public class CompressionMaxMinInt extends ValueCompressionHolder<int[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+    compressedValue = super.compress(compressor, DataType.INT, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -102,7 +102,7 @@ public class CompressionMaxMinInt extends ValueCompressionHolder<int[]> {
   @Override
   public void setValue(int[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore =
-        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, numberOfRows);
+        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.INT, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
index ca91d44..159e741 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionMaxMinLong extends ValueCompressionHolder<long[]> {
   /**
@@ -57,7 +57,7 @@ public class CompressionMaxMinLong extends ValueCompressionHolder<long[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+    compressedValue = super.compress(compressor, DataType.LONG, value);
   }
 
   @Override public void setValue(long[] value) {
@@ -102,7 +102,7 @@ public class CompressionMaxMinLong extends ValueCompressionHolder<long[]> {
   @Override
   public void setValue(long[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
+        .getMeasureDataChunkStore(DataType.LONG, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
index d44875a..1d36375 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionMaxMinShort extends ValueCompressionHolder<short[]> {
 
@@ -74,7 +74,7 @@ public class CompressionMaxMinShort extends ValueCompressionHolder<short[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+    compressedValue = super.compress(compressor, DataType.SHORT, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -104,7 +104,7 @@ public class CompressionMaxMinShort extends ValueCompressionHolder<short[]> {
   @Override
   public void setValue(short[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
+        .getMeasureDataChunkStore(DataType.SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
index 13f962f..1c46dea 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class CompressionNonDecimalByte extends ValueCompressionHolder<byte[]> {
   /**
@@ -58,7 +58,7 @@ public class CompressionNonDecimalByte extends ValueCompressionHolder<byte[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+    compressedValue = super.compress(compressor, DataType.BYTE, value);
   }
 
   @Override public void uncompress(DataType dataType, byte[] compressedData, int offset, int length,
@@ -92,7 +92,7 @@ public class CompressionNonDecimalByte extends ValueCompressionHolder<byte[]> {
   @Override
   public void setValue(byte[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.BYTE, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
index 15c7bb3..40312db 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalDefault extends ValueCompressionHolder<double[]> {
   /**
@@ -56,7 +56,7 @@ public class CompressionNonDecimalDefault extends ValueCompressionHolder<double[
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+    compressedValue = super.compress(compressor, DataType.DOUBLE, value);
   }
 
   @Override public void setValue(double[] value) {
@@ -94,7 +94,7 @@ public class CompressionNonDecimalDefault extends ValueCompressionHolder<double[
   @Override
   public void setValue(double[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
index 7e3606e..ada751e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalInt extends ValueCompressionHolder<int[]> {
   /**
@@ -58,7 +58,7 @@ public class CompressionNonDecimalInt extends ValueCompressionHolder<int[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+    compressedValue = super.compress(compressor, DataType.INT, value);
   }
 
   @Override public void setValueInBytes(byte[] bytesArr) {
@@ -93,7 +93,7 @@ public class CompressionNonDecimalInt extends ValueCompressionHolder<int[]> {
   @Override
   public void setValue(int[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore =
-        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, numberOfRows);
+        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.INT, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
index d810972..f0b2323 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalLong extends ValueCompressionHolder<long[]> {
   /**
@@ -60,7 +60,7 @@ public class CompressionNonDecimalLong extends ValueCompressionHolder<long[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+    compressedValue = super.compress(compressor, DataType.LONG, value);
   }
 
   @Override public void uncompress(DataType dataType, byte[] compressedData, int offset, int length,
@@ -95,7 +95,7 @@ public class CompressionNonDecimalLong extends ValueCompressionHolder<long[]> {
   @Override
   public void setValue(long[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
+        .getMeasureDataChunkStore(DataType.LONG, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
index 83e8a1a..c425706 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class CompressionNonDecimalMaxMinByte extends ValueCompressionHolder<byte[]> {
   /**
@@ -58,7 +58,7 @@ public class CompressionNonDecimalMaxMinByte extends ValueCompressionHolder<byte
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+    compressedValue = super.compress(compressor, DataType.BYTE, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -97,7 +97,7 @@ public class CompressionNonDecimalMaxMinByte extends ValueCompressionHolder<byte
   @Override
   public void setValue(byte[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.BYTE, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
index c408d9a..521328e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalMaxMinDefault extends ValueCompressionHolder<double[]> {
   /**
@@ -62,7 +62,7 @@ public class CompressionNonDecimalMaxMinDefault extends ValueCompressionHolder<d
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+    compressedValue = super.compress(compressor, DataType.DOUBLE, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -99,7 +99,7 @@ public class CompressionNonDecimalMaxMinDefault extends ValueCompressionHolder<d
   @Override
   public void setValue(double[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
index 40b1099..efd21d5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalMaxMinInt extends ValueCompressionHolder<int[]> {
   /**
@@ -60,7 +60,7 @@ public class CompressionNonDecimalMaxMinInt extends ValueCompressionHolder<int[]
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+    compressedValue = super.compress(compressor, DataType.INT, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -97,7 +97,7 @@ public class CompressionNonDecimalMaxMinInt extends ValueCompressionHolder<int[]
   @Override
   public void setValue(int[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore =
-        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, numberOfRows);
+        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.INT, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
index f8d8ed5..d2e2771 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalMaxMinLong extends ValueCompressionHolder<long[]> {
   /**
@@ -68,7 +68,7 @@ public class CompressionNonDecimalMaxMinLong extends ValueCompressionHolder<long
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+    compressedValue = super.compress(compressor, DataType.LONG, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -98,7 +98,7 @@ public class CompressionNonDecimalMaxMinLong extends ValueCompressionHolder<long
   @Override
   public void setValue(long[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
+        .getMeasureDataChunkStore(DataType.LONG, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
index 14648ba..241344a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalMaxMinShort extends ValueCompressionHolder<short[]> {
   /**
@@ -66,7 +66,7 @@ public class CompressionNonDecimalMaxMinShort extends ValueCompressionHolder<sho
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+    compressedValue = super.compress(compressor, DataType.SHORT, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -96,7 +96,7 @@ public class CompressionNonDecimalMaxMinShort extends ValueCompressionHolder<sho
   @Override
   public void setValue(short[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
+        .getMeasureDataChunkStore(DataType.SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
index 8536630..4737e10 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalShort extends ValueCompressionHolder<short[]> {
   /**
@@ -58,7 +58,7 @@ public class CompressionNonDecimalShort extends ValueCompressionHolder<short[]>
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+    compressedValue = super.compress(compressor, DataType.SHORT, value);
   }
 
   @Override public void uncompress(DataType dataType, byte[] compressedData, int offset, int length,
@@ -93,7 +93,7 @@ public class CompressionNonDecimalShort extends ValueCompressionHolder<short[]>
   @Override
   public void setValue(short[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
+        .getMeasureDataChunkStore(DataType.SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
index acd73d9..13af929 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class CompressionNoneByte extends ValueCompressionHolder<byte[]> {
   /**
@@ -72,7 +72,7 @@ public class CompressionNoneByte extends ValueCompressionHolder<byte[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+    compressedValue = super.compress(compressor, DataType.BYTE, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -98,7 +98,7 @@ public class CompressionNoneByte extends ValueCompressionHolder<byte[]> {
   @Override
   public void setValue(byte[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.BYTE, numberOfRows);
     this.measureChunkStore.putData(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
index 8e02fd8..cca7927 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNoneDefault extends ValueCompressionHolder<double[]> {
   /**
@@ -69,7 +69,7 @@ public class CompressionNoneDefault extends ValueCompressionHolder<double[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+    compressedValue = super.compress(compressor, DataType.DOUBLE, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -97,7 +97,7 @@ public class CompressionNoneDefault extends ValueCompressionHolder<double[]> {
   @Override
   public void setValue(double[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
index f0c0311..f5bb813 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNoneInt extends ValueCompressionHolder<int[]> {
   /**
@@ -62,7 +62,7 @@ public class CompressionNoneInt extends ValueCompressionHolder<int[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+    compressedValue = super.compress(compressor, DataType.INT, value);
   }
 
   @Override
@@ -96,7 +96,7 @@ public class CompressionNoneInt extends ValueCompressionHolder<int[]> {
   @Override
   public void setValue(int[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore =
-        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, numberOfRows);
+        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.INT, numberOfRows);
     this.measureChunkStore.putData(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java
index ee72c8a..99f24ac 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNoneLong extends ValueCompressionHolder<long[]> {
   /**
@@ -62,7 +62,7 @@ public class CompressionNoneLong extends ValueCompressionHolder<long[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+    compressedValue = super.compress(compressor, DataType.LONG, value);
   }
 
   @Override
@@ -96,7 +96,7 @@ public class CompressionNoneLong extends ValueCompressionHolder<long[]> {
   @Override
   public void setValue(long[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
+        .getMeasureDataChunkStore(DataType.LONG, numberOfRows);
     this.measureChunkStore.putData(data);
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java
index d4289ab..664f9e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNoneShort extends ValueCompressionHolder<short[]> {
   /**
@@ -71,7 +71,7 @@ public class CompressionNoneShort extends ValueCompressionHolder<short[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_SHORT, shortValue);
+    compressedValue = super.compress(compressor, DataType.SHORT, shortValue);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -98,7 +98,7 @@ public class CompressionNoneShort extends ValueCompressionHolder<short[]> {
   @Override
   public void setValue(short[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
+        .getMeasureDataChunkStore(DataType.SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
index fb21d95..8d3cc0d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
@@ -34,137 +34,57 @@ public class CarbonWriteDataHolder {
   private byte[][] byteValues;
 
   /**
-   * byteValues for no dictionary.
-   */
-  private byte[][][] byteValuesForNonDictionary;
-
-  /**
-   * byteValues
-   */
-  private byte[][][] columnByteValues;
-
-  /**
    * size
    */
   private int size;
 
   /**
-   * totalSize
+   * total size of the data in bytes added
    */
   private int totalSize;
 
-  /**
-   * Method to initialise double array
-   *
-   * @param size
-   */
-  public void initialiseDoubleValues(int size) {
-    if (size < 1) {
-      throw new IllegalArgumentException("Invalid array size");
-    }
-    doubleValues = new double[size];
-  }
-
   public void reset() {
     size = 0;
     totalSize = 0;
   }
 
   /**
-   * Method to initialise double array
-   *
-   * @param size
-   */
-  public void initialiseByteArrayValues(int size) {
-    if (size < 1) {
-      throw new IllegalArgumentException("Invalid array size");
-    }
-
-    byteValues = new byte[size][];
-    columnByteValues = new byte[size][][];
-  }
-
-  /**
-   * Method to initialise byte array
-   *
-   * @param size
+   * set long data type columnar data
+   * @param values
    */
-  public void initialiseByteArrayValuesForKey(int size) {
-    if (size < 1) {
-      throw new IllegalArgumentException("Invalid array size");
-    }
-
-    byteValues = new byte[size][];
-  }
-
-  public void initialiseByteArrayValuesForNonDictionary(int size) {
-    if (size < 1) {
-      throw new IllegalArgumentException("Invalid array size");
+  public void setWritableLongPage(long[] values) {
+    if (values != null) {
+      longValues = values;
+      size += values.length;
+      totalSize += values.length;
     }
-
-    byteValuesForNonDictionary = new byte[size][][];
   }
 
   /**
-   * Method to initialise long array
-   *
-   * @param size
+   * set double data type columnar data
+   * @param values
    */
-  public void initialiseLongValues(int size) {
-    if (size < 1) {
-      throw new IllegalArgumentException("Invalid array size");
+  public void setWritableDoublePage(double[] values) {
+    if (values != null) {
+      doubleValues = values;
+      size += values.length;
+      totalSize += values.length;
     }
-    longValues = new long[size];
-  }
-
-  /**
-   * set double value by index
-   *
-   * @param index
-   * @param value
-   */
-  public void setWritableDoubleValueByIndex(int index, Object value) {
-    doubleValues[index] = (Double) value;
-    size++;
   }
 
   /**
-   * set double value by index
-   *
-   * @param index
-   * @param value
-   */
-  public void setWritableLongValueByIndex(int index, Object value) {
-    longValues[index] = (Long) value;
-    size++;
-  }
-
-  /**
-   * set byte array value by index
-   *
-   * @param index
-   * @param value
+   * set decimal data type columnar data
+   * @param values
    */
-  public void setWritableByteArrayValueByIndex(int index, byte[] value) {
-    byteValues[index] = value;
-    size++;
-    if (null != value) totalSize += value.length;
-  }
-
-  public void setWritableNonDictByteArrayValueByIndex(int index, byte[][] value) {
-    byteValuesForNonDictionary[index] = value;
-    size++;
-    if (null != value) totalSize += value.length;
-  }
-
-  /**
-   * set byte array value by index
-   */
-  public void setWritableByteArrayValueByIndex(int index, int mdKeyIndex, Object[] columnData) {
-    int l = 0;
-    columnByteValues[index] = new byte[columnData.length - (mdKeyIndex + 1)][];
-    for (int i = mdKeyIndex + 1; i < columnData.length; i++) {
-      columnByteValues[index][l++] = (byte[]) columnData[i];
+  public void setWritableDecimalPage(byte[][] values) {
+    if (values != null) {
+      byteValues = values;
+      size += values.length;
+      for (int i = 0; i < values.length; i++) {
+        if (values[i] != null) {
+          totalSize += values[i].length;
+        }
+      }
     }
   }
 
@@ -187,30 +107,14 @@ public class CarbonWriteDataHolder {
     byte[] temp = new byte[totalSize];
     int startIndexToCopy = 0;
     for (int i = 0; i < size; i++) {
-      System.arraycopy(byteValues[i], 0, temp, startIndexToCopy, byteValues[i].length);
-      startIndexToCopy += byteValues[i].length;
+      if (byteValues[i] != null) {
+        System.arraycopy(byteValues[i], 0, temp, startIndexToCopy, byteValues[i].length);
+        startIndexToCopy += byteValues[i].length;
+      }
     }
     return temp;
   }
 
-  public byte[][] getByteArrayValues() {
-    if (size < byteValues.length) {
-      byte[][] temp = new byte[size][];
-      System.arraycopy(byteValues, 0, temp, 0, size);
-      byteValues = temp;
-    }
-    return byteValues;
-  }
-
-  public byte[][][] getNonDictByteArrayValues() {
-    if (size < byteValuesForNonDictionary.length) {
-      byte[][][] temp = new byte[size][][];
-      System.arraycopy(byteValuesForNonDictionary, 0, temp, 0, size);
-      byteValuesForNonDictionary = temp;
-    }
-    return byteValuesForNonDictionary;
-  }
-
   /**
    * Get Writable Double Values
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
deleted file mode 100644
index d3d67fd..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.impl.data.compressed;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-
-public class HeavyCompressedDoubleArrayDataStore {
-
-  // this method first invokes encoding routine to encode the data chunk,
-  // followed by invoking compression routine for preparing the data chunk for writing.
-  public static byte[][] encodeMeasureDataArray(
-      WriterCompressModel compressionModel,
-      CarbonWriteDataHolder[] dataHolder) {
-    char[] type = compressionModel.getType();
-    ValueCompressionHolder[] values =
-        new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
-    byte[][] returnValue = new byte[values.length][];
-    for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
-      values[i] = compressionModel.getValueCompressionHolder()[i];
-      if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE
-          && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        // first perform encoding of the data chunk
-        values[i].setValue(
-            ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
-                .getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i],
-                    compressionModel.getMaxValue()[i],
-                    compressionModel.getMantissa()[i]));
-      } else {
-        values[i].setValue(dataHolder[i].getWritableByteArrayValues());
-      }
-      values[i].compress();
-      returnValue[i] = values[i].getCompressedData();
-    }
-
-    return returnValue;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
new file mode 100644
index 0000000..25a813c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import org.apache.carbondata.core.datastore.page.statistics.PageStatistics;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+public class ColumnPage {
+
+  protected final DataType dataType;
+  protected final int pageSize;
+  protected PageStatistics stats;
+
+  protected ColumnPage(DataType dataType, int pageSize) {
+    this.dataType = dataType;
+    this.pageSize = pageSize;
+    this.stats = new PageStatistics(dataType);
+  }
+
+  protected void updateStatistics(Object value) {
+    stats.update(value);
+  }
+
+  public PageStatistics getStatistics() {
+    return stats;
+  }
+}


[3/5] carbondata git commit: refactor write step based on ColumnPage

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
new file mode 100644
index 0000000..024c341
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+// Represent a complex column page, e.g. Array, Struct type column
+public class ComplexColumnPage extends ColumnPage {
+
+  // Holds data for all rows in this page in columnar layout.
+  // After the complex data expand, it is of type byte[][], the first level array in the byte[][]
+  // representing a sub-column in the complex type, which can be retrieved by giving the depth
+  // of the complex type.
+  // TODO: further optimize it to make it more memory efficient
+  private List<ArrayList<byte[]>> complexColumnData;
+
+  // depth is the number of column after complex type is expanded. It is from 1 to N
+  private final int depth;
+
+  public ComplexColumnPage(int pageSize, int depth) {
+    super(DataType.BYTE_ARRAY, pageSize);
+    this.depth = depth;
+    complexColumnData = new ArrayList<>(depth);
+    for (int i = 0; i < depth; i++) {
+      complexColumnData.add(new ArrayList<byte[]>(pageSize));
+    }
+  }
+
+  public void putComplexData(int rowId, int depth, List<byte[]> value) {
+    assert (depth <= this.depth);
+    ArrayList<byte[]> subColumnPage = complexColumnData.get(depth);
+    subColumnPage.addAll(value);
+  }
+
+  // iterate on the sub-column after complex type is expanded, return columnar page of
+  // each sub-column
+  public Iterator<byte[][]> iterator() {
+
+    return new CarbonIterator<byte[][]>() {
+      private int index = 0;
+      @Override public boolean hasNext() {
+        return index < depth;
+      }
+
+      @Override public byte[][] next() {
+        // convert the subColumnPage from ArrayList<byte[]> to byte[][]
+        ArrayList<byte[]> subColumnPage = complexColumnData.get(index);
+        index++;
+        return subColumnPage.toArray(new byte[subColumnPage.size()][]);
+      }
+    };
+  }
+
+  public int getDepth() {
+    return depth;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java
new file mode 100644
index 0000000..a56563e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+// Represent a columnar data in one page for one column.
+public class FixLengthColumnPage extends ColumnPage {
+
+  // Only one of following fields will be used
+  private byte[] byteData;
+  private short[] shortData;
+  private int[] intData;
+  private long[] longData;
+  private double[] doubleData;
+
+  private byte[][] byteArrayData;
+
+  // The index of the rowId whose value is null, will be set to 1
+  private BitSet nullBitSet;
+
+  public FixLengthColumnPage(DataType dataType, int pageSize) {
+    super(dataType, pageSize);
+    nullBitSet = new BitSet(pageSize);
+    switch (dataType) {
+      case SHORT:
+      case INT:
+      case LONG:
+        longData = new long[pageSize];
+        break;
+      case DOUBLE:
+        doubleData = new double[pageSize];
+        break;
+      case DECIMAL:
+        byteArrayData = new byte[pageSize][];
+        break;
+      default:
+        throw new RuntimeException("Unsupported data dataType: " + dataType);
+    }
+  }
+
+  public DataType getDataType() {
+    return dataType;
+  }
+
+  private void putByte(int rowId, byte value) {
+    byteData[rowId] = value;
+  }
+
+  private void putShort(int rowId, short value) {
+    shortData[rowId] = value;
+  }
+
+  private void putInt(int rowId, int value) {
+    intData[rowId] = value;
+  }
+
+  private void putLong(int rowId, long value) {
+    longData[rowId] = value;
+  }
+
+  private void putDouble(int rowId, double value) {
+    doubleData[rowId] = value;
+  }
+
+  // This method will do LV (length value) coded of input bytes
+  private void putDecimalBytes(int rowId, byte[] decimalInBytes) {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(decimalInBytes.length +
+        CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    byteBuffer.putInt(decimalInBytes.length);
+    byteBuffer.put(decimalInBytes);
+    byteBuffer.flip();
+    byteArrayData[rowId] = byteBuffer.array();
+  }
+
+  public void putData(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+      return;
+    }
+    switch (dataType) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        putLong(rowId, (long) value);
+        break;
+      case DOUBLE:
+        putDouble(rowId, (double) value);
+        break;
+      case DECIMAL:
+        putDecimalBytes(rowId, (byte[]) value);
+        break;
+      default:
+        throw new RuntimeException("unsupported data type: " + dataType);
+    }
+    updateStatistics(value);
+  }
+
+  private void putNull(int rowId) {
+    nullBitSet.set(rowId);
+    switch (dataType) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        putLong(rowId, 0L);
+        break;
+      case DOUBLE:
+        putDouble(rowId, 0.0);
+        break;
+      case DECIMAL:
+        byte[] decimalInBytes = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
+        putDecimalBytes(rowId, decimalInBytes);
+        break;
+    }
+  }
+
+  public long[] getLongPage() {
+    return longData;
+  }
+
+  public double[] getDoublePage() {
+    return doubleData;
+  }
+
+  public byte[][] getDecimalPage() {
+    return byteArrayData;
+  }
+
+  public BitSet getNullBitSet() {
+    return nullBitSet;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java
new file mode 100644
index 0000000..d5e9ce3
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+// Represent a variable length columnar data in one page, e.g. for dictionary columns.
+public class VarLengthColumnPage extends ColumnPage {
+
+  // TODO: further optimizite it, to store length and data separately
+  private byte[][] byteArrayData;
+
+  public VarLengthColumnPage(int pageSize) {
+    super(DataType.BYTE_ARRAY, pageSize);
+    byteArrayData = new byte[pageSize][];
+  }
+
+  public void putByteArray(int rowId, byte[] value) {
+    byteArrayData[rowId] = value;
+    updateStatistics(value);
+  }
+
+  public byte[][] getByteArrayPage() {
+    return byteArrayData;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java
new file mode 100644
index 0000000..c954a33
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/compression/Compression.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.compression;
+
+public interface Compression {
+  byte[] compress(byte[] input);
+  byte[] decompress(byte[] input);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
new file mode 100644
index 0000000..e870ad6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnCodec.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+/**
+ *  Codec for a column page data, implementation should not keep state across pages,
+ *  caller will use the same object to encode multiple pages.
+ */
+public interface ColumnCodec {
+
+  /** Codec name will be stored in BlockletHeader (DataChunk3) */
+  String getName();
+
+  byte[] encode(ColumnPage columnPage);
+
+  ColumnPage decode(byte[] encoded);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
new file mode 100644
index 0000000..0dd23c7
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DummyCodec.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.encoding;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+public class DummyCodec implements ColumnCodec {
+  @Override
+  public String getName() {
+    return "DummyCodec";
+  }
+
+  @Override
+  public byte[] encode(ColumnPage columnPage) {
+    return null;
+  }
+
+  @Override
+  public ColumnPage decode(byte[] encoded) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java
new file mode 100644
index 0000000..3ecf1da
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/** statics for one column page */
+public class PageStatistics {
+  private DataType dataType;
+
+  /** min and max value of the measures */
+  private Object min, max;
+
+  /**
+   * the unique value is the non-exist value in the row,
+   * and will be used as storage key for null values of measures
+   */
+  private Object uniqueValue;
+
+  /** decimal count of the measures */
+  private int decimal;
+
+  public PageStatistics(DataType dataType) {
+    this.dataType = dataType;
+    switch (dataType) {
+      case SHORT:
+      case INT:
+      case LONG:
+        max = Long.MIN_VALUE;
+        min = Long.MAX_VALUE;
+        uniqueValue = Long.MIN_VALUE;
+        break;
+      case DOUBLE:
+        max = Double.MIN_VALUE;
+        min = Double.MAX_VALUE;
+        uniqueValue = Double.MIN_VALUE;
+        break;
+      case DECIMAL:
+        max = new BigDecimal(Double.MIN_VALUE);
+        min = new BigDecimal(Double.MAX_VALUE);
+        uniqueValue = new BigDecimal(Double.MIN_VALUE);
+        break;
+    }
+    decimal = 0;
+  }
+
+  /**
+   * update the statistics for the input row
+   */
+  public void update(Object value) {
+    switch (dataType) {
+      case SHORT:
+      case INT:
+      case LONG:
+        max = ((long) max > (long) value) ? max : value;
+        min = ((long) min < (long) value) ? min : value;
+        uniqueValue = (long) min - 1;
+        break;
+      case DOUBLE:
+        max = ((double) max > (double) value) ? max : value;
+        min = ((double) min < (double) value) ? min : value;
+        int num = getDecimalCount((double) value);
+        decimal = decimal > num ? decimal : num;
+        uniqueValue = (double) min - 1;
+        break;
+      case DECIMAL:
+        BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) value);
+        decimal = decimalValue.scale();
+        BigDecimal val = (BigDecimal) min;
+        uniqueValue = (val.subtract(new BigDecimal(1.0)));
+        break;
+      case ARRAY:
+      case STRUCT:
+        // for complex type column, writer is not going to use stats, so, do nothing
+    }
+  }
+
+  /**
+   * return no of digit after decimal
+   */
+  private int getDecimalCount(double value) {
+    String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString();
+    int integerPlaces = strValue.indexOf('.');
+    int decimalPlaces = 0;
+    if (-1 != integerPlaces) {
+      decimalPlaces = strValue.length() - integerPlaces - 1;
+    }
+    return decimalPlaces;
+  }
+
+  public Object getMin() {
+    return min;
+  }
+
+  public Object getMax() {
+    return max;
+  }
+
+  public Object getUniqueValue() {
+    return uniqueValue;
+  }
+
+  public int getDecimal() {
+    return decimal;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java
new file mode 100644
index 0000000..f8b336c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/StatisticsCollector.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+
+/**
+ * Calculate the statistics for a column page and blocklet
+ */
+public interface StatisticsCollector {
+
+  /**
+   * name will be stored in Header
+   */
+  String getName();
+
+  void startPage(int pageID);
+
+  void endPage(int pageID);
+
+  void startBlocklet(int blockletID);
+
+  void endBlocklet(int blockletID);
+
+  void startBlock(int blocklID);
+
+  void endBlock(int blockID);
+
+  /**
+   * Update the stats for the input batch
+   */
+  void update(ColumnPage batch);
+
+  /**
+   * Ouput will be written to DataChunk2 (page header)
+   */
+  byte[] getPageStatistisc();
+
+  /**
+   * Output will be written to DataChunk3 (blocklet header)
+   */
+  byte[] getBlockletStatistics();
+
+  /**
+   * Output will be written to Footer
+   */
+  byte[] getBlockStatistics();
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
index 4a9007c..741b999 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
@@ -19,6 +19,9 @@ package org.apache.carbondata.core.metadata;
 
 import java.io.Serializable;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
 /**
  * DO NOT MODIFY THIS CLASS AND PACKAGE NAME, BECAUSE
  * IT IS SERIALIZE TO STORE
@@ -78,7 +81,20 @@ public class ValueEncoderMeta implements Serializable {
     this.decimal = decimal;
   }
 
-  public char getType() {
+  public DataType getType() {
+    switch (type) {
+      case CarbonCommonConstants.BIG_INT_MEASURE:
+        return DataType.LONG;
+      case CarbonCommonConstants.DOUBLE_MEASURE:
+        return DataType.DOUBLE;
+      case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+        return DataType.DECIMAL;
+      default:
+        throw new RuntimeException("Unexpected type: " + type);
+    }
+  }
+
+  public char getTypeInChar() {
     return type;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index d77406c..da13d5c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -32,7 +32,11 @@ public enum DataType {
   DECIMAL(8, "DECIMAL"),
   ARRAY(9, "ARRAY"),
   STRUCT(10, "STRUCT"),
-  MAP(11, "MAP");
+  MAP(11, "MAP"),
+  BYTE(12, "BYTE"),
+
+  // internal use only
+  BYTE_ARRAY(13, "BYTE ARRAY");
 
   private int precedenceOrder;
   private String name ;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index f4ab982..caba75f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -39,6 +39,8 @@ public final class ByteUtil {
 
   public static final String UTF8_CSN = StandardCharsets.UTF_8.name();
 
+  public static final byte[] ZERO_IN_BYTES = toBytes(0);
+
   private ByteUtil() {
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 6398f30..6fe38e2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -554,7 +554,7 @@ public class CarbonMetadataUtil {
     Object[] minValue = new Object[encoderMetas.length];
     int[] decimalLength = new int[encoderMetas.length];
     Object[] uniqueValue = new Object[encoderMetas.length];
-    char[] aggType = new char[encoderMetas.length];
+    DataType[] aggType = new DataType[encoderMetas.length];
     byte[] dataTypeSelected = new byte[encoderMetas.length];
     for (int i = 0; i < encoderMetas.length; i++) {
       maxValue[i] = encoderMetas[i].getMaxValue();
@@ -827,25 +827,29 @@ public class CarbonMetadataUtil {
 
   public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
     ByteBuffer buffer = null;
-    if (valueEncoderMeta.getType() == CarbonCommonConstants.DOUBLE_MEASURE) {
-      buffer = ByteBuffer.allocate(
-          (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-              + 3);
-      buffer.putChar(valueEncoderMeta.getType());
-      buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
-      buffer.putDouble((Double) valueEncoderMeta.getMinValue());
-      buffer.putDouble((Double) valueEncoderMeta.getUniqueValue());
-    } else if (valueEncoderMeta.getType() == CarbonCommonConstants.BIG_INT_MEASURE) {
-      buffer = ByteBuffer.allocate(
-          (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-              + 3);
-      buffer.putChar(valueEncoderMeta.getType());
-      buffer.putLong((Long) valueEncoderMeta.getMaxValue());
-      buffer.putLong((Long) valueEncoderMeta.getMinValue());
-      buffer.putLong((Long) valueEncoderMeta.getUniqueValue());
-    } else {
-      buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
-      buffer.putChar(valueEncoderMeta.getType());
+    switch (valueEncoderMeta.getType()) {
+      case LONG:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(valueEncoderMeta.getTypeInChar());
+        buffer.putLong((Long) valueEncoderMeta.getMaxValue());
+        buffer.putLong((Long) valueEncoderMeta.getMinValue());
+        buffer.putLong((Long) valueEncoderMeta.getUniqueValue());
+        break;
+      case DOUBLE:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(valueEncoderMeta.getTypeInChar());
+        buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
+        buffer.putDouble((Double) valueEncoderMeta.getMinValue());
+        buffer.putDouble((Double) valueEncoderMeta.getUniqueValue());
+        break;
+      case DECIMAL:
+        buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
+        buffer.putChar(valueEncoderMeta.getTypeInChar());
+        break;
     }
     buffer.putInt(valueEncoderMeta.getDecimal());
     buffer.put(valueEncoderMeta.getDataTypeSelected());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 92c85a1..496adff 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -749,5 +749,4 @@ public final class CarbonProperties {
     }
     return numberOfDeltaFilesThreshold;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 99463de..8e4df1a 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -833,7 +833,7 @@ public final class CarbonUtil {
     Object[] minValue = new Object[encodeMetaList.size()];
     Object[] uniqueValue = new Object[encodeMetaList.size()];
     int[] decimal = new int[encodeMetaList.size()];
-    char[] type = new char[encodeMetaList.size()];
+    DataType[] type = new DataType[encodeMetaList.size()];
     byte[] dataTypeSelected = new byte[encodeMetaList.size()];
 
     /*

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java b/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
index d931af6..732d053 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CompressionFinder.java
@@ -16,8 +16,8 @@
  */
 package org.apache.carbondata.core.util;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil.COMPRESSION_TYPE;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 
 /**
@@ -37,7 +37,7 @@ public class CompressionFinder implements Comparable<CompressionFinder> {
 
   private PRIORITY priority;
 
-  private char measureStoreType;
+  private DataType measureStoreType;
 
   /**
    * CompressionFinder constructor.
@@ -47,7 +47,7 @@ public class CompressionFinder implements Comparable<CompressionFinder> {
    * @param convertedDataType
    */
   CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType,
-      DataType convertedDataType, char measureStoreType) {
+      DataType convertedDataType, DataType measureStoreType) {
     super();
     this.compType = compType;
     this.actualDataType = actualDataType;
@@ -65,7 +65,7 @@ public class CompressionFinder implements Comparable<CompressionFinder> {
    */
 
   CompressionFinder(COMPRESSION_TYPE compType, DataType actualDataType, DataType convertedDataType,
-      PRIORITY priority, char measureStoreType) {
+      PRIORITY priority, DataType measureStoreType) {
     super();
     this.actualDataType = actualDataType;
     this.convertedDataType = convertedDataType;
@@ -155,7 +155,7 @@ public class CompressionFinder implements Comparable<CompressionFinder> {
     return priority;
   }
 
-  public char getMeasureStoreType() {
+  public DataType getMeasureStoreType() {
     return measureStoreType;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 80c9e72..e33d198 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -156,9 +156,6 @@ public final class DataTypeUtil {
     }
   }
 
-  // bytes of 0 in BigDecimal
-  public static final byte[] zeroBigDecimalBytes = bigDecimalToByte(BigDecimal.valueOf(0));
-
   /**
    * This method will convert a big decimal value to bytes
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
index 69ed9f8..a37a9a7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -57,16 +57,6 @@ public class NodeHolder {
   private int[] keyLengths;
 
   /**
-   * dataAfterCompression
-   */
-  private short[][] dataAfterCompression;
-
-  /**
-   * indexMap
-   */
-  private short[][] indexMap;
-
-  /**
    * keyIndexBlockLenght
    */
   private int[] keyBlockIndexLength;
@@ -86,11 +76,6 @@ public class NodeHolder {
   private int[] dataIndexMapLength;
 
   /**
-   * dataIndexMap
-   */
-  private int[] dataIndexMapOffsets;
-
-  /**
    * compressedDataIndex
    */
   private byte[][] compressedDataIndex;
@@ -120,19 +105,9 @@ public class NodeHolder {
   private boolean[] aggBlocks;
 
   /**
-   * all columns max value
-   */
-  private byte[][] allMaxValue;
-
-  /**
-   * all column max value
-   */
-  private byte[][] allMinValue;
-
-  /**
    * true if given index is colgroup block
    */
-  private boolean[] colGrpBlock;
+  private boolean[] colGrpBlocks;
 
   /**
    * bit set which will holds the measure
@@ -383,14 +358,14 @@ public class NodeHolder {
    * @return
    */
   public boolean[] getColGrpBlocks() {
-    return this.colGrpBlock;
+    return this.colGrpBlocks;
   }
 
   /**
    * @param colGrpBlock true if block is column group
    */
   public void setColGrpBlocks(boolean[] colGrpBlock) {
-    this.colGrpBlock = colGrpBlock;
+    this.colGrpBlocks = colGrpBlock;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
index c8a9397..5020acb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
@@ -28,10 +28,29 @@ import org.apache.carbondata.core.datastore.compression.MeasureMetaDataModel;
 import org.apache.carbondata.core.datastore.compression.ReaderCompressModel;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastore.compression.decimal.*;
-import org.apache.carbondata.core.datastore.compression.nondecimal.*;
-import org.apache.carbondata.core.datastore.compression.none.*;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressByteArray;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinByte;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinDefault;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinInt;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinLong;
+import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinShort;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalByte;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalDefault;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalInt;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalLong;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalMaxMinByte;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalMaxMinDefault;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalMaxMinInt;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalMaxMinLong;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalMaxMinShort;
+import org.apache.carbondata.core.datastore.compression.nondecimal.CompressionNonDecimalShort;
+import org.apache.carbondata.core.datastore.compression.none.CompressionNoneByte;
+import org.apache.carbondata.core.datastore.compression.none.CompressionNoneDefault;
+import org.apache.carbondata.core.datastore.compression.none.CompressionNoneInt;
+import org.apache.carbondata.core.datastore.compression.none.CompressionNoneLong;
+import org.apache.carbondata.core.datastore.compression.none.CompressionNoneShort;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public final class ValueCompressionUtil {
 
@@ -47,29 +66,28 @@ public final class ValueCompressionUtil {
    * @see
    */
   private static DataType getDataType(double value, int mantissa, byte dataTypeSelected) {
-    DataType dataType = DataType.DATA_DOUBLE;
+    DataType dataType = DataType.DOUBLE;
     if (mantissa == 0) {
       if (value <= Byte.MAX_VALUE && value >= Byte.MIN_VALUE) {
-        dataType = DataType.DATA_BYTE;
+        dataType = DataType.BYTE;
       } else if (value <= Short.MAX_VALUE && value >= Short.MIN_VALUE) {
-        dataType = DataType.DATA_SHORT;
+        dataType = DataType.SHORT;
       } else if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
-        dataType = DataType.DATA_INT;
+        dataType = DataType.INT;
       } else if (value <= Long.MAX_VALUE && value >= Long.MIN_VALUE) {
-        dataType = DataType.DATA_LONG;
+        dataType = DataType.LONG;
       }
     } else {
       if (dataTypeSelected == 1) {
         if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
           float floatValue = (float) value;
           if (floatValue - value != 0) {
-            dataType = DataType.DATA_DOUBLE;
-
+            dataType = DataType.DOUBLE;
           } else {
-            dataType = DataType.DATA_FLOAT;
+            dataType = DataType.FLOAT;
           }
         } else if (value <= Double.MAX_VALUE && value >= Double.MIN_VALUE) {
-          dataType = DataType.DATA_DOUBLE;
+          dataType = DataType.DOUBLE;
         }
       }
     }
@@ -84,14 +102,14 @@ public final class ValueCompressionUtil {
    * @see
    */
   public static int getSize(DataType dataType) {
-
     switch (dataType) {
-      case DATA_BYTE:
+      case BOOLEAN:
+      case BYTE:
         return 1;
-      case DATA_SHORT:
+      case SHORT:
         return 2;
-      case DATA_INT:
-      case DATA_FLOAT:
+      case INT:
+      case FLOAT:
         return 4;
       default:
         return 8;
@@ -110,19 +128,17 @@ public final class ValueCompressionUtil {
    * @see
    */
   public static CompressionFinder getCompressionFinder(Object maxValue, Object minValue,
-      int mantissa, char measureStoreType, byte dataTypeSelected) {
-    // ''l' for long, 'n' for double
+      int mantissa, DataType measureStoreType, byte dataTypeSelected) {
     switch (measureStoreType) {
-      case 'b':
-        return new CompressionFinder(COMPRESSION_TYPE.BIGDECIMAL, DataType.DATA_BYTE,
-            DataType.DATA_BYTE, measureStoreType);
-      case 'd':
+      case DECIMAL:
+        return new CompressionFinder(COMPRESSION_TYPE.BIGDECIMAL, DataType.BYTE,
+            DataType.BYTE, measureStoreType);
+      case SHORT:
+      case INT:
+      case LONG:
         return getLongCompressorFinder(maxValue, minValue, mantissa, dataTypeSelected,
             measureStoreType);
-      case 'l':
-        return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE,
-            DataType.DATA_BIGINT, DataType.DATA_BIGINT, measureStoreType);
-      case 'n':
+      case DOUBLE:
         return getDoubleCompressorFinder(maxValue, minValue, mantissa, dataTypeSelected,
             measureStoreType);
       default:
@@ -131,7 +147,7 @@ public final class ValueCompressionUtil {
   }
 
   private static CompressionFinder getDoubleCompressorFinder(Object maxValue, Object minValue,
-      int mantissa, byte dataTypeSelected, char measureStoreType) {
+      int mantissa, byte dataTypeSelected, DataType measureStoreType) {
     //Here we should use the Max abs as max to getDatatype, let's say -1 and -10000000, -1 is max,
     //but we can't use -1 to getDatatype, we should use -10000000.
     double absMaxValue = Math.abs((double) maxValue) >= Math.abs((double) minValue) ?
@@ -145,13 +161,13 @@ public final class ValueCompressionUtil {
       int adaptiveSize = getSize(adaptiveDataType);
       int deltaSize = getSize(deltaDataType);
       if (adaptiveSize > deltaSize) {
-        return new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, DataType.DATA_DOUBLE,
+        return new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, DataType.DOUBLE,
             deltaDataType, measureStoreType);
       } else if (adaptiveSize < deltaSize) {
-        return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_DOUBLE,
+        return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DOUBLE,
             deltaDataType, measureStoreType);
       } else {
-        return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_DOUBLE,
+        return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DOUBLE,
             adaptiveDataType, measureStoreType);
       }
     } else {
@@ -178,7 +194,7 @@ public final class ValueCompressionUtil {
   }
 
   private static CompressionFinder getLongCompressorFinder(Object maxValue, Object minValue,
-      int mantissa, byte dataTypeSelected, char measureStoreType) {
+      int mantissa, byte dataTypeSelected, DataType measureStoreType) {
     DataType adaptiveDataType = getDataType((long) maxValue, mantissa, dataTypeSelected);
     int adaptiveSize = getSize(adaptiveDataType);
     DataType deltaDataType = null;
@@ -186,20 +202,20 @@ public final class ValueCompressionUtil {
     // consider the scenario when max and min value are equal to is long max and min value OR
     // when the max and min value are resulting in a value greater than long max value, then
     // it is not possible to determine the compression type.
-    if (adaptiveDataType == DataType.DATA_LONG) {
-      deltaDataType = DataType.DATA_BIGINT;
+    if (adaptiveDataType == DataType.LONG) {
+      deltaDataType = DataType.LONG;
     } else {
       deltaDataType = getDataType((long) maxValue - (long) minValue, mantissa, dataTypeSelected);
     }
     int deltaSize = getSize(deltaDataType);
     if (adaptiveSize > deltaSize) {
-      return new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, DataType.DATA_BIGINT,
+      return new CompressionFinder(COMPRESSION_TYPE.DELTA_DOUBLE, DataType.LONG,
           deltaDataType, measureStoreType);
     } else if (adaptiveSize < deltaSize) {
-      return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_BIGINT,
+      return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.LONG,
           deltaDataType, measureStoreType);
     } else {
-      return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.DATA_BIGINT,
+      return new CompressionFinder(COMPRESSION_TYPE.ADAPTIVE, DataType.LONG,
           adaptiveDataType, measureStoreType);
     }
   }
@@ -234,7 +250,9 @@ public final class ValueCompressionUtil {
    */
   public static ValueCompressor getValueCompressor(CompressionFinder compressorFinder) {
     switch (compressorFinder.getMeasureStoreType()) {
-      case 'd':
+      case SHORT:
+      case INT:
+      case LONG:
         return new BigIntCompressor();
       default:
         return new DoubleCompressor();
@@ -295,7 +313,7 @@ public final class ValueCompressionUtil {
   private static Object compressNone(DataType changedDataType, double[] value) {
     int i = 0;
     switch (changedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
 
         for (double a : value) {
@@ -304,7 +322,7 @@ public final class ValueCompressionUtil {
         }
         return result;
 
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
 
         for (double a : value) {
@@ -313,7 +331,7 @@ public final class ValueCompressionUtil {
         }
         return shortResult;
 
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
 
         for (double a : value) {
@@ -322,8 +340,7 @@ public final class ValueCompressionUtil {
         }
         return intResult;
 
-      case DATA_LONG:
-      case DATA_BIGINT:
+      case LONG:
         long[] longResult = new long[value.length];
 
         for (double a : value) {
@@ -332,7 +349,7 @@ public final class ValueCompressionUtil {
         }
         return longResult;
 
-      case DATA_FLOAT:
+      case FLOAT:
         float[] floatResult = new float[value.length];
 
         for (double a : value) {
@@ -353,7 +370,7 @@ public final class ValueCompressionUtil {
   private static Object compressMaxMin(DataType changedDataType, double[] value, double maxValue) {
     int i = 0;
     switch (changedDataType) {
-      case DATA_BYTE:
+      case BYTE:
 
         byte[] result = new byte[value.length];
         for (double a : value) {
@@ -362,7 +379,7 @@ public final class ValueCompressionUtil {
         }
         return result;
 
-      case DATA_SHORT:
+      case SHORT:
 
         short[] shortResult = new short[value.length];
 
@@ -372,7 +389,7 @@ public final class ValueCompressionUtil {
         }
         return shortResult;
 
-      case DATA_INT:
+      case INT:
 
         int[] intResult = new int[value.length];
 
@@ -382,7 +399,7 @@ public final class ValueCompressionUtil {
         }
         return intResult;
 
-      case DATA_LONG:
+      case LONG:
 
         long[] longResult = new long[value.length];
 
@@ -392,7 +409,7 @@ public final class ValueCompressionUtil {
         }
         return longResult;
 
-      case DATA_FLOAT:
+      case FLOAT:
 
         float[] floatResult = new float[value.length];
 
@@ -422,7 +439,7 @@ public final class ValueCompressionUtil {
   private static Object compressNonDecimal(DataType changedDataType, double[] value, int mantissa) {
     int i = 0;
     switch (changedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
 
         for (double a : value) {
@@ -430,7 +447,7 @@ public final class ValueCompressionUtil {
           i++;
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
 
         for (double a : value) {
@@ -438,7 +455,7 @@ public final class ValueCompressionUtil {
           i++;
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
 
         int[] intResult = new int[value.length];
 
@@ -448,7 +465,7 @@ public final class ValueCompressionUtil {
         }
         return intResult;
 
-      case DATA_LONG:
+      case LONG:
 
         long[] longResult = new long[value.length];
 
@@ -458,7 +475,7 @@ public final class ValueCompressionUtil {
         }
         return longResult;
 
-      case DATA_FLOAT:
+      case FLOAT:
 
         float[] floatResult = new float[value.length];
 
@@ -489,7 +506,7 @@ public final class ValueCompressionUtil {
     int i = 0;
     BigDecimal max = BigDecimal.valueOf(maxValue);
     switch (changedDataType) {
-      case DATA_BYTE:
+      case BYTE:
 
         byte[] result = new byte[value.length];
 
@@ -501,7 +518,7 @@ public final class ValueCompressionUtil {
         }
         return result;
 
-      case DATA_SHORT:
+      case SHORT:
 
         short[] shortResult = new short[value.length];
 
@@ -513,7 +530,7 @@ public final class ValueCompressionUtil {
         }
         return shortResult;
 
-      case DATA_INT:
+      case INT:
 
         int[] intResult = new int[value.length];
 
@@ -525,7 +542,7 @@ public final class ValueCompressionUtil {
         }
         return intResult;
 
-      case DATA_LONG:
+      case LONG:
 
         long[] longResult = new long[value.length];
 
@@ -537,7 +554,7 @@ public final class ValueCompressionUtil {
         }
         return longResult;
 
-      case DATA_FLOAT:
+      case FLOAT:
 
         float[] floatResult = new float[value.length];
 
@@ -570,14 +587,13 @@ public final class ValueCompressionUtil {
   public static ValueCompressionHolder getCompressionNone(DataType compDataType,
       DataType actualDataType) {
     switch (compDataType) {
-      case DATA_BYTE:
+      case BYTE:
         return new CompressionNoneByte(actualDataType);
-      case DATA_SHORT:
+      case SHORT:
         return new CompressionNoneShort(actualDataType);
-      case DATA_INT:
+      case INT:
         return new CompressionNoneInt(actualDataType);
-      case DATA_LONG:
-      case DATA_BIGINT:
+      case LONG:
         return new CompressionNoneLong(actualDataType);
       default:
         return new CompressionNoneDefault(actualDataType);
@@ -590,13 +606,13 @@ public final class ValueCompressionUtil {
   public static ValueCompressionHolder getCompressionDecimalMaxMin(
       DataType compDataType, DataType actualDataType) {
     switch (compDataType) {
-      case DATA_BYTE:
+      case BYTE:
         return new CompressionMaxMinByte(actualDataType);
-      case DATA_SHORT:
+      case SHORT:
         return new CompressionMaxMinShort(actualDataType);
-      case DATA_INT:
+      case INT:
         return new CompressionMaxMinInt(actualDataType);
-      case DATA_LONG:
+      case LONG:
         return new CompressionMaxMinLong(actualDataType);
       default:
         return new CompressionMaxMinDefault(actualDataType);
@@ -609,13 +625,13 @@ public final class ValueCompressionUtil {
   public static ValueCompressionHolder getCompressionNonDecimal(
       DataType compDataType) {
     switch (compDataType) {
-      case DATA_BYTE:
+      case BYTE:
         return new CompressionNonDecimalByte();
-      case DATA_SHORT:
+      case SHORT:
         return new CompressionNonDecimalShort();
-      case DATA_INT:
+      case INT:
         return new CompressionNonDecimalInt();
-      case DATA_LONG:
+      case LONG:
         return new CompressionNonDecimalLong();
       default:
         return new CompressionNonDecimalDefault();
@@ -628,13 +644,13 @@ public final class ValueCompressionUtil {
   public static ValueCompressionHolder getCompressionNonDecimalMaxMin(
       DataType compDataType) {
     switch (compDataType) {
-      case DATA_BYTE:
+      case BYTE:
         return new CompressionNonDecimalMaxMinByte();
-      case DATA_SHORT:
+      case SHORT:
         return new CompressionNonDecimalMaxMinShort();
-      case DATA_INT:
+      case INT:
         return new CompressionNonDecimalMaxMinInt();
-      case DATA_LONG:
+      case LONG:
         return new CompressionNonDecimalMaxMinLong();
       default:
         return new CompressionNonDecimalMaxMinDefault();
@@ -645,10 +661,10 @@ public final class ValueCompressionUtil {
    * Create Value compression model for write path
    */
   public static WriterCompressModel getWriterCompressModel(Object[] maxValue, Object[] minValue,
-      int[] mantissa, Object[] uniqueValue, char[] aggType, byte[] dataTypeSelected) {
+      int[] mantissa, Object[] uniqueValue, DataType[] dataType, byte[] dataTypeSelected) {
     MeasureMetaDataModel metaDataModel =
         new MeasureMetaDataModel(minValue, maxValue, mantissa, maxValue.length, uniqueValue,
-            aggType, dataTypeSelected);
+            dataType, dataTypeSelected);
     return getWriterCompressModel(metaDataModel);
   }
 
@@ -661,7 +677,7 @@ public final class ValueCompressionUtil {
     Object[] maxValue = measureMDMdl.getMaxValue();
     Object[] uniqueValue = measureMDMdl.getUniqueValue();
     int[] mantissa = measureMDMdl.getMantissa();
-    char[] type = measureMDMdl.getType();
+    DataType[] type = measureMDMdl.getType();
     byte[] dataTypeSelected = measureMDMdl.getDataTypeSelected();
     WriterCompressModel compressionModel = new WriterCompressModel();
     DataType[] actualType = new DataType[measureCount];
@@ -772,20 +788,4 @@ public final class ValueCompressionUtil {
      */
     BIGDECIMAL
   }
-
-  /**
-   * use to identify the type of data.
-   */
-  public enum DataType {
-    DATA_BYTE(),
-    DATA_SHORT(),
-    DATA_INT(),
-    DATA_FLOAT(),
-    DATA_LONG(),
-    DATA_BIGINT(),
-    DATA_DOUBLE(),
-    DATA_BIGDECIMAL();
-    DataType() {
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 2c6c890..ddcc8a4 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -28,6 +28,8 @@ import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.format.*;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.ColumnSchema;
+import org.apache.carbondata.format.DataType;
+
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -169,7 +171,12 @@ public class CarbonMetadataUtilTest {
     long[] longArr = { 1, 2, 3, 4, 5 };
     byte[][] maxByteArr = { { 1, 2 }, { 3, 4 }, { 5, 6 }, { 2, 4 }, { 1, 2 } };
     int[] cardinality = { 1, 2, 3, 4, 5 };
-    char[] charArr = { 'a', 's', 'd', 'g', 'h' };
+    org.apache.carbondata.core.metadata.datatype.DataType[] dataType = {
+        org.apache.carbondata.core.metadata.datatype.DataType.INT,
+        org.apache.carbondata.core.metadata.datatype.DataType.INT,
+        org.apache.carbondata.core.metadata.datatype.DataType.INT,
+        org.apache.carbondata.core.metadata.datatype.DataType.INT,
+        org.apache.carbondata.core.metadata.datatype.DataType.INT };
 
     org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema colSchema =
         new org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema();
@@ -191,7 +198,7 @@ public class CarbonMetadataUtilTest {
     writerCompressModel.setMinValue(objMinArr);
     writerCompressModel.setDataTypeSelected(byteArr);
     writerCompressModel.setMantissa(intArr);
-    writerCompressModel.setType(charArr);
+    writerCompressModel.setType(dataType);
     writerCompressModel.setUniqueValue(objMinArr);
 
     BlockletInfoColumnar blockletInfoColumnar = new BlockletInfoColumnar();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
index 6252ca1..3032085 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.datastore.compression.decimal.*;
 import org.apache.carbondata.core.datastore.compression.nondecimal.*;
 import org.apache.carbondata.core.datastore.compression.none.*;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 import org.junit.Test;
 
@@ -36,8 +36,8 @@ public class ValueCompressionUtilTest {
 
   @Test public void testGetSize() {
     DataType[] dataTypes =
-        { DataType.DATA_BIGINT, DataType.DATA_INT, DataType.DATA_BYTE, DataType.DATA_SHORT,
-            DataType.DATA_FLOAT };
+        { DataType.LONG, DataType.INT, DataType.BOOLEAN, DataType.SHORT,
+            DataType.FLOAT };
     int[] expectedSizes = { 8, 4, 1, 2, 4 };
     for (int i = 0; i < dataTypes.length; i++) {
       assertEquals(expectedSizes[i], ValueCompressionUtil.getSize(dataTypes[i]));
@@ -48,7 +48,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 25, 12, 22 };
     int[] result = (int[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_INT, 22, 0);
+            DataType.INT, 22, 0);
     int[] expectedResult = { -3, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -59,7 +59,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     byte[] result = (byte[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_BYTE, 22, 0);
+            DataType.BYTE, 22, 0);
     byte[] expectedResult = { 2, 1, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -70,7 +70,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 200, 21, 22 };
     short[] result = (short[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_SHORT, 22, 0);
+            DataType.SHORT, 22, 0);
     short[] expectedResult = { -178, 1, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -81,7 +81,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 2000, 2100, 2002 };
     long[] result = (long[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_LONG, 2125, 0);
+            DataType.LONG, 2125, 0);
     long[] expectedResult = { 125, 25, 123 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -92,7 +92,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.121, 21.223, 22.345 };
     float[] result = (float[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_FLOAT, 22.345, 3);
+            DataType.FLOAT, 22.345, 3);
     float[] expectedResult = { 2.224f, 1.122f, 0f };
     for (int i = 0; i < result.length; i++) {
     	assertTrue(result[i]-expectedResult[i]==0);
@@ -103,7 +103,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.121, 21.223, 22.345 };
     double[] result = (double[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE, values,
-            DataType.DATA_DOUBLE, 102.345, 3);
+            DataType.DOUBLE, 102.345, 3);
     double[] expectedResult = { 82.224, 81.122, 80.0 };
     for (int i = 0; i < result.length; i++) {
       assertTrue(result[i]-expectedResult[i]==0);
@@ -114,7 +114,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.121, 21.223, 22.345 };
     long[] result = (long[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values,
-            DataType.DATA_BIGINT, 22, 0);
+            DataType.LONG, 22, 0);
     long[] expectedResult = { 20, 21, 22 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -124,7 +124,7 @@ public class ValueCompressionUtilTest {
   @Test public void testToGetCompressedValuesWithCompressionTypeNoneForDataByte() {
     double[] values = { 20, 21, 22 };
     byte[] result = (byte[]) ValueCompressionUtil
-        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.DATA_BYTE,
+        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.BYTE,
             22, 0);
     byte[] expectedResult = { 20, 21, 22 };
     for (int i = 0; i < result.length; i++) {
@@ -136,7 +136,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 200000, 21, 22 };
     short[] result = (short[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values,
-            DataType.DATA_SHORT, 22, 0);
+            DataType.SHORT, 22, 0);
     short[] expectedResult = { 3392, 21, 22 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -146,7 +146,7 @@ public class ValueCompressionUtilTest {
   @Test public void testToGetCompressedValuesWithCompressionTypeNoneForDataInt() {
     double[] values = { 20, 21, 22 };
     int[] result = (int[]) ValueCompressionUtil
-        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.DATA_INT,
+        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.INT,
             22, 0);
     int[] expectedResult = { 20, 21, 22 };
     for (int i = 0; i < result.length; i++) {
@@ -157,7 +157,7 @@ public class ValueCompressionUtilTest {
   @Test public void testToGetCompressedValuesWithCompressionTypeNoneForDataLong() {
     double[] values = { 20, 21, 22 };
     long[] result = (long[]) ValueCompressionUtil
-        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.DATA_LONG,
+        .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values, DataType.LONG,
             22, 0);
     long[] expectedResult = { 20, 21, 22 };
     for (int i = 0; i < result.length; i++) {
@@ -169,7 +169,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.121, 21.223, 22.345 };
     float[] result = (float[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values,
-            DataType.DATA_FLOAT, 22, 3);
+            DataType.FLOAT, 22, 3);
     float[] expectedResult = { 20.121f, 21.223f, 22.345f };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],3);
@@ -180,7 +180,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.121, 21.223, 22.345 };
     double[] result = (double[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE, values,
-            DataType.DATA_DOUBLE, 22, 3);
+            DataType.DOUBLE, 22, 3);
     double[] expectedResult = { 20.121, 21.223, 22.345 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],3);
@@ -191,7 +191,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     float[] result = (float[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_FLOAT, 22, 1);
+            DataType.FLOAT, 22, 1);
     float[] expectedResult = { 201f, 212f, 223f };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],0);
@@ -202,7 +202,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     byte[] result = (byte[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_BYTE, 22, 1);
+            DataType.BYTE, 22, 1);
     byte[] expectedResult = { -55, -44, -33 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -213,7 +213,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     short[] result = (short[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_SHORT, 22, 1);
+            DataType.SHORT, 22, 1);
     short[] expectedResult = { 201, 212, 223 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -224,7 +224,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     int[] result = (int[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_INT, 22, 1);
+            DataType.INT, 22, 1);
     int[] expectedResult = { 201, 212, 223 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -235,7 +235,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     long[] result = (long[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_LONG, 22, 1);
+            DataType.LONG, 22, 1);
     long[] expectedResult = { 201, 212, 223 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -246,7 +246,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20.1, 21.2, 22.3 };
     double[] result = (double[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT, values,
-            DataType.DATA_DOUBLE, 22, 1);
+            DataType.DOUBLE, 22, 1);
     double[] expectedResult = { 201, 212, 223 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],0);
@@ -257,7 +257,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     byte[] result = (byte[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_BYTE, 22, 1);
+            DataType.BYTE, 22, 1);
     byte[] expectedResult = { 20, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -268,7 +268,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     int[] result = (int[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_INT, 22, 1);
+            DataType.INT, 22, 1);
     int[] expectedResult = { 20, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -279,7 +279,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     double[] result = (double[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_DOUBLE, 22, 1);
+            DataType.DOUBLE, 22, 1);
     double[] expectedResult = { 20, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],0);
@@ -290,7 +290,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20000, 21, 22 };
     short[] result = (short[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_SHORT, 22, 1);
+            DataType.SHORT, 22, 1);
     short[] expectedResult = { -3172, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -301,7 +301,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     long[] result = (long[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_LONG, 22, 1);
+            DataType.LONG, 22, 1);
     long[] expectedResult = { 20, 10, 0 };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i]);
@@ -312,7 +312,7 @@ public class ValueCompressionUtilTest {
     double[] values = { 20, 21, 22 };
     float[] result = (float[]) ValueCompressionUtil
         .getCompressedValues(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_NON_DECIMAL, values,
-            DataType.DATA_FLOAT, 22, 1);
+            DataType.FLOAT, 22, 1);
     float[] expectedResult = { 20f, 10f, 0f };
     for (int i = 0; i < result.length; i++) {
       assertEquals(result[i], expectedResult[i],0);
@@ -321,127 +321,127 @@ public class ValueCompressionUtilTest {
 
   @Test public void testToUnCompressNone() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_BIGINT, DataType.DATA_BIGINT);
+        ValueCompressionUtil.getCompressionNone(DataType.LONG, DataType.LONG);
     assertEquals(result.getClass(), CompressionNoneLong.class);
   }
 
   @Test public void testToUnCompressNoneForByte() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_BYTE, DataType.DATA_FLOAT);
+        ValueCompressionUtil.getCompressionNone(DataType.BYTE, DataType.FLOAT);
     assertEquals(result.getClass(), CompressionNoneByte.class);
   }
 
   @Test public void testToUnCompressNoneForLong() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_LONG, DataType.DATA_FLOAT);
+        ValueCompressionUtil.getCompressionNone(DataType.LONG, DataType.FLOAT);
     assertEquals(result.getClass(), CompressionNoneLong.class);
   }
 
   @Test public void testToUnCompressNoneForShort() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_SHORT, DataType.DATA_FLOAT);
+        ValueCompressionUtil.getCompressionNone(DataType.SHORT, DataType.FLOAT);
     assertEquals(result.getClass(), CompressionNoneShort.class);
   }
 
   @Test public void testToUnCompressNoneForInt() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_INT, DataType.DATA_FLOAT);
+        ValueCompressionUtil.getCompressionNone(DataType.INT, DataType.FLOAT);
     assertEquals(result.getClass(), CompressionNoneInt.class);
   }
 
   @Test public void testToUnCompressNoneForDouble() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNone(DataType.DATA_DOUBLE, DataType.DATA_FLOAT);
+        ValueCompressionUtil.getCompressionNone(DataType.DOUBLE, DataType.FLOAT);
     assertEquals(result.getClass(), CompressionNoneDefault.class);
   }
 
   @Test public void testToUnCompressMaxMinForDouble() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DATA_DOUBLE, null);
+        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DOUBLE, null);
     assertEquals(result.getClass(), CompressionMaxMinDefault.class);
   }
 
   @Test public void testToUnCompressMaxMinForInt() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DATA_INT, null);
+        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.INT, null);
     assertEquals(result.getClass(), CompressionMaxMinInt.class);
   }
 
   @Test public void testToUnCompressMaxMinForLong() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DATA_LONG, null);
+        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.LONG, null);
     assertEquals(result.getClass(), CompressionMaxMinLong.class);
   }
 
   @Test public void testToUnCompressMaxMinForByte() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DATA_BYTE, null);
+        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.BYTE, null);
     assertEquals(result.getClass(), CompressionMaxMinByte.class);
   }
 
   @Test public void testToUnCompressMaxMinForShort() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.DATA_SHORT, null);
+        ValueCompressionUtil.getCompressionDecimalMaxMin(DataType.SHORT, null);
     assertEquals(result.getClass(), CompressionMaxMinShort.class);
   }
 
   @Test public void testToUnCompressNonDecimalForDouble() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimal(DataType.DATA_DOUBLE);
+        ValueCompressionUtil.getCompressionNonDecimal(DataType.DOUBLE);
     assertEquals(result.getClass(), CompressionNonDecimalDefault.class);
   }
 
   @Test public void testToUnCompressNonDecimalForInt() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimal(DataType.DATA_INT);
+        ValueCompressionUtil.getCompressionNonDecimal(DataType.INT);
     assertEquals(result.getClass(), CompressionNonDecimalInt.class);
   }
 
   @Test public void testToUnCompressNonDecimalForLong() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimal(DataType.DATA_LONG);
+        ValueCompressionUtil.getCompressionNonDecimal(DataType.LONG);
     assertEquals(result.getClass(), CompressionNonDecimalLong.class);
   }
 
   @Test public void testToUnCompressNonDecimalForByte() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimal(DataType.DATA_BYTE);
+        ValueCompressionUtil.getCompressionNonDecimal(DataType.BYTE);
     assertEquals(result.getClass(), CompressionNonDecimalByte.class);
   }
 
   @Test public void testToUnCompressNonDecimalForShort() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimal(DataType.DATA_SHORT);
+        ValueCompressionUtil.getCompressionNonDecimal(DataType.SHORT);
     assertEquals(result.getClass(), CompressionNonDecimalShort.class);
   }
 
   @Test public void testToUnCompressNonDecimalMaxMinForDouble() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DATA_DOUBLE);
+        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DOUBLE);
     assertEquals(result.getClass(), CompressionNonDecimalMaxMinDefault.class);
   }
 
   @Test public void testToUnCompressNonDecimalMaxMinForInt() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DATA_INT);
+        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.INT);
     assertEquals(result.getClass(), CompressionNonDecimalMaxMinInt.class);
   }
 
   @Test public void testToUnCompressNonDecimalMaxMinForLong() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DATA_LONG);
+        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.LONG);
     assertEquals(result.getClass(), CompressionNonDecimalMaxMinLong.class);
   }
 
   @Test public void testToUnCompressNonDecimalMaxMinForByte() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DATA_BYTE);
+        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.BYTE);
     assertEquals(result.getClass(), CompressionNonDecimalMaxMinByte.class);
   }
 
   @Test public void testToUnCompressNonDecimalMaxMinForShort() {
     ValueCompressionHolder result =
-        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.DATA_SHORT);
+        ValueCompressionUtil.getCompressionNonDecimalMaxMin(DataType.SHORT);
     assertEquals(result.getClass(), CompressionNonDecimalMaxMinShort.class);
   }
 
@@ -490,7 +490,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 1L, 2L, 3L };
     int[] decimalLength = { 0, 0, 0 };
     Object[] uniqueValues = { 5, new Long[]{2L,4L}, 2L};
-    char[] types = { 'l', 'l', 'l' };
+    DataType[] types = { DataType.LONG, DataType.LONG, DataType.LONG };
     byte[] dataTypeSelected = { 1, 2, 4 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 3, uniqueValues, types,
@@ -510,7 +510,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 1.0 };
     int[] decimalLength = { 0 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -526,7 +526,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 32500.00 };
     int[] decimalLength = { 0 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -542,7 +542,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 1111078433.0 };
     int[] decimalLength = { 0 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -558,7 +558,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 32744.0 };
     int[] decimalLength = { 0 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE};
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -574,7 +574,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 32744.0 };
     int[] decimalLength = { 1 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -590,7 +590,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 32744.0 };
     int[] decimalLength = { 1 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 0 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
@@ -606,7 +606,7 @@ public class ValueCompressionUtilTest {
     Object[] minValues = { 32744.0 };
     int[] decimalLength = { 1 };
     Object[] uniqueValues = { 5 };
-    char[] types = { 'n' };
+    DataType[] types = { DataType.DOUBLE };
     byte[] dataTypeSelected = { 1 };
     MeasureMetaDataModel measureMetaDataModel =
         new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
index d02e25f..480ed04 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
@@ -184,7 +184,7 @@ public class CarbonFooterWriterTest extends TestCase{
     compressionModel.setMaxValue(new Object[] { 44d, 55d });
     compressionModel.setMinValue(new Object[] { 0d, 0d });
     compressionModel.setMantissa(new int[] { 0, 0 });
-    compressionModel.setType(new char[] { 'n', 'n' });
+    compressionModel.setType(new DataType[] { DataType.DOUBLE, DataType.DOUBLE });
     compressionModel.setUniqueValue(new Object[] { 0d, 0d });
     compressionModel.setDataTypeSelected(new byte[2]);
     infoColumnar.setCompressionModel(compressionModel);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 371b9bb..9ae01b8 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -43,6 +43,7 @@ object CarbonSessionExample {
       .master("local")
       .appName("CarbonSessionExample")
       .config("spark.sql.warehouse.dir", warehouse)
+      .config("spark.driver.host", "localhost")
       .getOrCreateCarbonSession(storeLocation, metastoredb)
 
     spark.sparkContext.setLogLevel("WARN")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index 937108c..b4cbc4e 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -114,8 +114,12 @@ struct DataChunk{
 }
 
 /**
- * Represents a chunk of data. The chunk can be a single column stored in Column Major format or a group of columns stored in Row Major Format.
- * For V2 format.
+ * Represents the metadata of a data chunk.
+ * The chunk can be a single column stored in Column Major format or a group of columns stored
+ * in Row Major format.
+ *
+ * For V3, one data chunk is one page data of 32K rows.
+ * For V2 & V1, one data chunk is one blocklet data.
  */
 struct DataChunk2{
     1: required ChunkCompressionMeta chunk_meta; // The metadata of a chunk

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
index f31d434..5d6c07a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/emptyrow/TestEmptyRows.scala
@@ -28,7 +28,7 @@ class TestEmptyRows extends QueryTest with BeforeAndAfterAll {
   override def beforeAll {
     sql("drop table if exists emptyRowCarbonTable")
     sql("drop table if exists emptyRowHiveTable")
-    //eid,ename,sal,presal,comm,deptno,Desc
+
     sql(
       "create table if not exists emptyRowCarbonTable (eid int,ename String,sal decimal,presal " +
         "decimal,comm decimal" +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNoMeasure.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNoMeasure.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNoMeasure.scala
index 3d85814..fa7b970 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNoMeasure.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNoMeasure.scala
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll
 class TestLoadDataWithNoMeasure extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
-    sql("DROP TABLE IF EXISTS nomeasureTest_sd")
+    sql("DROP TABLE IF EXISTS nomeasureTest")
     sql(
       "CREATE TABLE nomeasureTest (empno String, doj String) STORED BY 'org.apache.carbondata" +
         ".format'"
@@ -106,8 +106,8 @@ class TestLoadDataWithNoMeasure extends QueryTest with BeforeAndAfterAll {
   }
 
   override def afterAll {
-    sql("drop table nomeasureTest")
-    sql("drop table nomeasureTest_sd")
-    sql("drop table nomeasureTest_scd")
+    sql("drop table if exists nomeasureTest")
+    sql("drop table if exists nomeasureTest_sd")
+    sql("drop table if exists nomeasureTest_scd")
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a79a3a16/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
index f1c1d69..ab003c0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/detailquery/ColumnGroupDataTypesTestCase.scala
@@ -27,6 +27,12 @@ import org.scalatest.BeforeAndAfterAll
 class ColumnGroupDataTypesTestCase extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
+    sql("drop table if exists colgrp")
+    sql("drop table if exists normal")
+    sql("drop table if exists colgrp_dictexclude_before")
+    sql("drop table if exists colgrp_dictexclude_after")
+    sql("drop table if exists colgrp_disorder")
+
     sql("create table colgrp (column1 string,column2 string,column3 string,column4 string,column5 string,column6 string,column7 string,column8 string,column9 string,column10 string,measure1 int,measure2 int,measure3 int,measure4 int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES (\"COLUMN_GROUPS\"=\"(column2,column3,column4),(column7,column8,column9)\")")
     sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/10dim_4msr.csv' INTO table colgrp options('FILEHEADER'='column1,column2,column3,column4,column5,column6,column7,column8,column9,column10,measure1,measure2,measure3,measure4')");
     sql("create table normal (column1 string,column2 string,column3 string,column4 string,column5 string,column6 string,column7 string,column8 string,column9 string,column10 string,measure1 int,measure2 int,measure3 int,measure4 int) STORED BY 'org.apache.carbondata.format'")