You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/05/15 05:06:25 UTC

[12/19] carbondata git commit: [CARBONDATA-1015] Refactory write step and add ColumnPage in data load This closes #852

http://git-wip-us.apache.org/repos/asf/carbondata/blob/98df130a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 6d81d59..9a7450a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -25,6 +25,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -48,9 +49,12 @@ import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInv
 import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.datastore.impl.data.compressed.HeavyCompressedDoubleArrayDataStore;
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
+import org.apache.carbondata.core.datastore.page.FixLengthColumnPage;
+import org.apache.carbondata.core.datastore.page.VarLengthColumnPage;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -58,6 +62,7 @@ import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengt
 import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -67,10 +72,6 @@ import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
-import org.apache.carbondata.processing.store.colgroup.ColGroupDataHolder;
-import org.apache.carbondata.processing.store.colgroup.ColGroupMinMax;
-import org.apache.carbondata.processing.store.colgroup.ColumnDataHolder;
-import org.apache.carbondata.processing.store.colgroup.DataHolder;
 import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
@@ -116,7 +117,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private int mdKeyIndex;
   /**
-   * blocklet size
+   * blocklet size (for V1 and V2) or page size (for V3). A Producer thread will start to process
+   * once this size of input is reached
    */
   private int blockletSize;
   /**
@@ -140,14 +142,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private int tableBlockSize;
   /**
-   * otherMeasureIndex
-   */
-  private int[] otherMeasureIndex;
-  /**
-   * customMeasureIndex
-   */
-  private int[] customMeasureIndex;
-  /**
    * dimLens
    */
   private int[] dimLens;
@@ -161,18 +155,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private CarbonKeyBlockHolder[] keyBlockHolder;
   private boolean[] aggKeyBlock;
   private boolean[] isNoDictionary;
-  private boolean isAggKeyBlock;
   private long processedDataCount;
-  /**
-   * thread pool size to be used for block sort
-   */
-  private int thread_pool_size;
   private KeyGenerator[] complexKeyGenerator;
-  /**
-   * isDataWritingRequest
-   */
-  //    private boolean isDataWritingRequest;
-
   private ExecutorService producerExecutorService;
   private List<Future<Void>> producerExecutorServiceTaskList;
   private ExecutorService consumerExecutorService;
@@ -181,7 +165,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private int noDictionaryCount;
   private ColumnGroupModel colGrpModel;
   private int[] primitiveDimLens;
-  private char[] type;
+  private DataType[] type;
   private int[] completeDimLens;
   private boolean[] isUseInvertedIndex;
   /**
@@ -201,10 +185,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private BlockletDataHolder blockletDataHolder;
   /**
-   * a private class which will take each blocklet in order and write to a file
-   */
-  private Consumer consumer;
-  /**
    * number of cores configured
    */
   private int numberOfCores;
@@ -227,20 +207,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private int complexColCount;
 
   /**
-   * no of column blocks
-   */
-  private int columnStoreCount;
-
-  /**
    * column schema present in the table
    */
   private List<ColumnSchema> wrapperColumnSchemaList;
 
   /**
    * boolean to check whether dimension
-   * is of dictionary type or no dictionary time
+   * is of dictionary type or no dictionary type
    */
-  private boolean[] dimensionType;
+  private boolean[] isDictDimension;
 
   /**
    * colCardinality for the merge case.
@@ -260,8 +235,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
   private long schemaUpdatedTimeStamp;
 
-  private String segmentId;
-
   private int taskExtension;
 
   /**
@@ -277,19 +250,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.dimensionCount = carbonFactDataHandlerModel.getDimensionCount();
     this.complexIndexMap = carbonFactDataHandlerModel.getComplexIndexMap();
     this.primitiveDimLens = carbonFactDataHandlerModel.getPrimitiveDimLens();
-    this.isAggKeyBlock = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
-            CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
     this.carbonDataDirectoryPath = carbonFactDataHandlerModel.getCarbonDataDirectoryPath();
-    this.complexColCount = getComplexColsCount();
-    this.columnStoreCount =
-        this.colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount;
+    this.complexColCount = getExpandedComplexColsCount();
 
-    this.aggKeyBlock = new boolean[columnStoreCount];
-    this.isNoDictionary = new boolean[columnStoreCount];
+    int numDimColumns = colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount;
+    this.aggKeyBlock = new boolean[numDimColumns];
+    this.isNoDictionary = new boolean[numDimColumns];
     this.bucketNumber = carbonFactDataHandlerModel.getBucketId();
     this.taskExtension = carbonFactDataHandlerModel.getTaskExtension();
-    this.isUseInvertedIndex = new boolean[columnStoreCount];
+    this.isUseInvertedIndex = new boolean[numDimColumns];
     if (null != carbonFactDataHandlerModel.getIsUseInvertedIndex()) {
       for (int i = 0; i < isUseInvertedIndex.length; i++) {
         if (i < carbonFactDataHandlerModel.getIsUseInvertedIndex().length) {
@@ -305,6 +274,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       this.isNoDictionary[noDictStartIndex + i] = true;
     }
 
+    boolean isAggKeyBlock = Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
+            CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
     if (isAggKeyBlock) {
       int noDictionaryValue = Integer.parseInt(CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_VALUE,
@@ -346,7 +318,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.databaseName = carbonFactDataHandlerModel.getDatabaseName();
     this.tableBlockSize = carbonFactDataHandlerModel.getBlockSizeInMB();
     this.tableName = carbonFactDataHandlerModel.getTableName();
-    this.type = carbonFactDataHandlerModel.getAggType();
+    this.type = carbonFactDataHandlerModel.getMeasureDataType();
     this.segmentProperties = carbonFactDataHandlerModel.getSegmentProperties();
     this.wrapperColumnSchemaList = carbonFactDataHandlerModel.getWrapperColumnSchema();
     this.colCardinality = carbonFactDataHandlerModel.getColCardinality();
@@ -360,11 +332,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     this.dimLens = this.segmentProperties.getDimColumnsCardinality();
     this.carbonDataFileAttributes = carbonFactDataHandlerModel.getCarbonDataFileAttributes();
     this.schemaUpdatedTimeStamp = carbonFactDataHandlerModel.getSchemaUpdatedTimeStamp();
-    this.segmentId = carbonFactDataHandlerModel.getSegmentId();
+
     //TODO need to pass carbon table identifier to metadata
     CarbonTable carbonTable = CarbonMetadata.getInstance()
         .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
-    dimensionType =
+    isDictDimension =
         CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(tableName));
 
     this.compactionFlow = carbonFactDataHandlerModel.isCompactionFlow();
@@ -403,15 +375,17 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     consumerExecutorServiceTaskList = new ArrayList<>(1);
     semaphore = new Semaphore(numberOfCores);
     blockletDataHolder = new BlockletDataHolder();
-    consumer = new Consumer(blockletDataHolder);
+
+    // Start the consumer which will take each blocklet/page in order and write to a file
+    Consumer consumer = new Consumer(blockletDataHolder);
     consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer));
   }
 
   private boolean[] arrangeUniqueBlockType(boolean[] aggKeyBlock) {
     int counter = 0;
     boolean[] uniqueBlock = new boolean[aggKeyBlock.length];
-    for (int i = 0; i < dimensionType.length; i++) {
-      if (dimensionType[i]) {
+    for (int i = 0; i < isDictDimension.length; i++) {
+      if (isDictDimension[i]) {
         uniqueBlock[i] = aggKeyBlock[counter++];
       } else {
         uniqueBlock[i] = false;
@@ -463,8 +437,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     if (this.entryCount == this.blockletSize) {
       try {
         semaphore.acquire();
-        producerExecutorServiceTaskList.add(producerExecutorService.submit(
-            new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)));
+
+        producerExecutorServiceTaskList.add(
+            producerExecutorService.submit(
+                new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)
+            )
+        );
         blockletProcessingCount.incrementAndGet();
         // set the entry count to zero
         processedDataCount += entryCount;
@@ -478,417 +456,268 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
   }
 
-  /** statics for one blocklet/page */
-  class Statistics {
-    /** min and max value of the measures */
-    Object[] min, max;
-
-    /**
-     * the unique value is the non-exist value in the row,
-     * and will be used as storage key for null values of measures
-     */
-    Object[] uniqueValue;
-
-    /** decimal count of the measures */
-    int[] decimal;
-
-    Statistics(int measureCount) {
-      max = new Object[measureCount];
-      min = new Object[measureCount];
-      uniqueValue = new Object[measureCount];
-      decimal = new int[measureCount];
-      for (int i = 0; i < measureCount; i++) {
-        if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          max[i] = Long.MIN_VALUE;
-          min[i] = Long.MAX_VALUE;
-          uniqueValue[i] = Long.MIN_VALUE;
-        } else if (type[i] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          max[i] = Double.MIN_VALUE;
-          min[i] = Double.MAX_VALUE;
-          uniqueValue[i] = Double.MIN_VALUE;
-        } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          max[i] = new BigDecimal(Double.MIN_VALUE);
-          min[i] = new BigDecimal(Double.MAX_VALUE);
-          uniqueValue[i] = new BigDecimal(Double.MIN_VALUE);
-        } else {
-          max[i] = 0.0;
-          min[i] = 0.0;
-          uniqueValue[i] = 0.0;
-        }
-        decimal[i] = 0;
-      }
-    }
-
-    /**
-     * update the statistics for the input row
-     */
-    void update(int[] msrIndex, Object[] row, boolean compactionFlow) {
-      // Update row level min max
-      for (int i = 0; i < msrIndex.length; i++) {
-        int count = msrIndex[i];
-        if (row[count] != null) {
-          if (type[count] == CarbonCommonConstants.DOUBLE_MEASURE) {
-            double value = (double) row[count];
-            double maxVal = (double) max[count];
-            double minVal = (double) min[count];
-            max[count] = (maxVal > value ? max[count] : value);
-            min[count] = (minVal < value ? min[count] : value);
-            int num = getDecimalCount(value);
-            decimal[count] = (decimal[count] > num ? decimal[count] : num);
-            uniqueValue[count] = (double) min[count] - 1;
-          } else if (type[count] == CarbonCommonConstants.BIG_INT_MEASURE) {
-            long value = (long) row[count];
-            long maxVal = (long) max[count];
-            long minVal = (long) min[count];
-            max[count] = (maxVal > value ? max[count] : value);
-            min[count] = (minVal < value ? min[count] : value);
-            uniqueValue[count] = (long) min[count] - 1;
-          } else if (type[count] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-            byte[] buff = null;
-            // in compaction flow the measure with decimal type will come as spark decimal.
-            // need to convert it to byte array.
-            if (compactionFlow) {
-              BigDecimal bigDecimal = ((Decimal) row[count]).toJavaBigDecimal();
-              buff = DataTypeUtil.bigDecimalToByte(bigDecimal);
-            } else {
-              buff = (byte[]) row[count];
-            }
-            BigDecimal value = DataTypeUtil.byteToBigDecimal(buff);
-            decimal[count] = value.scale();
-            BigDecimal val = (BigDecimal) min[count];
-            uniqueValue[count] = (val.subtract(new BigDecimal(1.0)));
-          }
-        }
-      }
-    }
-  }
-
   class IndexKey {
+    private int pageSize;
     byte[] currentMDKey = null;
     byte[][] currentNoDictionaryKey = null;
     byte[] startKey = null;
     byte[] endKey = null;
     byte[][] noDictStartKey = null;
     byte[][] noDictEndKey = null;
+    byte[] packedNoDictStartKey = null;
+    byte[] packedNoDictEndKey = null;
+
+    IndexKey(int pageSize) {
+      this.pageSize = pageSize;
+    }
 
     /** update all keys based on the input row */
-    void update(Object[] row, boolean firstRow) {
+    void update(int rowId, Object[] row) {
       currentMDKey = (byte[]) row[mdKeyIndex];
       if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
         currentNoDictionaryKey = (byte[][]) row[mdKeyIndex - 1];
       }
-      if (firstRow) {
+      if (rowId == 0) {
         startKey = currentMDKey;
         noDictStartKey = currentNoDictionaryKey;
       }
       endKey = currentMDKey;
       noDictEndKey = currentNoDictionaryKey;
-    }
-  }
-
-  /** generate the NodeHolder from the input rows */
-  private NodeHolder processDataRows(List<Object[]> dataRows)
-      throws CarbonDataWriterException {
-    if (dataRows.size() == 0) {
-      return new NodeHolder();
-    }
-    // to store index of the measure columns which are null
-    BitSet[] nullValueIndexBitSet = getMeasureNullValueIndexBitSet(measureCount);
-    // statistics for one blocklet/page
-    Statistics stats = new Statistics(measureCount);
-    IndexKey keys = new IndexKey();
-
-    // initialize measureHolder, mdKeyHolder and noDictionaryHolder, these three Holders
-    // are the input for final encoding
-    CarbonWriteDataHolder[] measureHolder = initialiseDataHolder(dataRows.size());
-    CarbonWriteDataHolder mdKeyHolder = initialiseKeyBlockHolder(dataRows.size());
-    CarbonWriteDataHolder noDictionaryHolder = null;
-    if ((noDictionaryCount + complexColCount) > 0) {
-      noDictionaryHolder = initialiseKeyBlockHolderForNonDictionary(dataRows.size());
-    }
-
-    // loop on the input rows, fill measureHolder, mdKeyHolder and noDictionaryHolder
-    for (int count = 0; count < dataRows.size(); count++) {
-      Object[] row = dataRows.get(count);
-      keys.update(row, (count == 0));
-      if (keys.currentMDKey.length > 0) {
-        mdKeyHolder.setWritableByteArrayValueByIndex(count, keys.currentMDKey);
-      }
-      if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-        noDictionaryHolder.setWritableNonDictByteArrayValueByIndex(count,
-            keys.currentNoDictionaryKey);
+      if (rowId == pageSize - 1) {
+        finalizeKeys();
       }
-      fillMeasureHolder(row, count, measureHolder, nullValueIndexBitSet);
-      stats.update(otherMeasureIndex, row, compactionFlow);
-      stats.update(customMeasureIndex, row, compactionFlow);
     }
 
-    // generate encoded byte array for 3 holders
-    // for measure columns: encode and compress the measureHolder
-    WriterCompressModel compressionModel =
-        ValueCompressionUtil.getWriterCompressModel(
-            stats.max, stats.min, stats.decimal, stats.uniqueValue, type, new byte[measureCount]);
-    byte[][] encodedMeasureArray =
-        HeavyCompressedDoubleArrayDataStore.encodeMeasureDataArray(
-            compressionModel, measureHolder);
-
-    // for mdkey and noDictionary, it is already in bytes, just get the array from holder
-    byte[][] mdKeyArray = mdKeyHolder.getByteArrayValues();
-    byte[][][] noDictionaryArray = null;
-    if ((noDictionaryCount + complexColCount) > 0) {
-      noDictionaryArray = noDictionaryHolder.getNonDictByteArrayValues();
-    }
-
-    // create NodeHolder using these encoded byte arrays
-    NodeHolder nodeHolder =
-        createNodeHolderObjectWithOutKettle(
-            encodedMeasureArray, mdKeyArray, noDictionaryArray, dataRows.size(),
-            keys.startKey, keys.endKey, compressionModel, keys.noDictStartKey, keys.noDictEndKey,
-            nullValueIndexBitSet);
-    LOGGER.info("Number Of records processed: " + dataRows.size());
-    return nodeHolder;
-  }
-
-  private void fillMeasureHolder(Object[] row, int count, CarbonWriteDataHolder[] measureHolder,
-      BitSet[] nullValueIndexBitSet) {
-    for (int k = 0; k < otherMeasureIndex.length; k++) {
-      if (type[otherMeasureIndex[k]] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        if (null == row[otherMeasureIndex[k]]) {
-          nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
-          measureHolder[otherMeasureIndex[k]].setWritableLongValueByIndex(count, 0L);
-        } else {
-          measureHolder[otherMeasureIndex[k]]
-              .setWritableLongValueByIndex(count, row[otherMeasureIndex[k]]);
+    /** update all keys if SORT_COLUMNS option is used when creating table */
+    private void finalizeKeys() {
+      // If SORT_COLUMNS is used, may need to update start/end keys since the they may
+      // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
+      // start/end key
+      int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns();
+      if (numberOfDictSortColumns > 0) {
+        // if SORT_COLUMNS contain dictionary columns
+        int[] keySize = columnarSplitter.getBlockKeySize();
+        if (keySize.length > numberOfDictSortColumns) {
+          // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here
+          int newMdkLength = 0;
+          for (int i = 0; i < numberOfDictSortColumns; i++) {
+            newMdkLength += keySize[i];
+          }
+          byte[] newStartKeyOfSortKey = new byte[newMdkLength];
+          byte[] newEndKeyOfSortKey = new byte[newMdkLength];
+          System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength);
+          System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength);
+          startKey = newStartKeyOfSortKey;
+          endKey = newEndKeyOfSortKey;
         }
       } else {
-        if (null == row[otherMeasureIndex[k]]) {
-          nullValueIndexBitSet[otherMeasureIndex[k]].set(count);
-          measureHolder[otherMeasureIndex[k]].setWritableDoubleValueByIndex(count, 0.0);
-        } else {
-          measureHolder[otherMeasureIndex[k]]
-              .setWritableDoubleValueByIndex(count, row[otherMeasureIndex[k]]);
-        }
+        startKey = new byte[0];
+        endKey = new byte[0];
       }
-    }
-    ByteBuffer byteBuffer = null;
-    byte[] measureBytes = null;
-    for (int i = 0; i < customMeasureIndex.length; i++) {
-      if (null == row[customMeasureIndex[i]]
-          && type[customMeasureIndex[i]] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        measureBytes = DataTypeUtil.zeroBigDecimalBytes;
-        nullValueIndexBitSet[customMeasureIndex[i]].set(count);
-      } else {
-        if (this.compactionFlow) {
-          BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
-          measureBytes = DataTypeUtil.bigDecimalToByte(bigDecimal);
-        } else {
-          measureBytes = (byte[]) row[customMeasureIndex[i]];
+
+      // Do the same update for noDictionary start/end Key
+      int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns();
+      if (numberOfNoDictSortColumns > 0) {
+        // if sort_columns contain no-dictionary columns
+        if (noDictStartKey.length > numberOfNoDictSortColumns) {
+          byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
+          byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
+          System.arraycopy(
+              noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns);
+          System.arraycopy(
+              noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
+          noDictStartKey = newNoDictionaryStartKey;
+          noDictEndKey = newNoDictionaryEndKey;
         }
+        packedNoDictStartKey =
+            NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey);
+        packedNoDictEndKey =
+            NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey);
       }
-      byteBuffer = ByteBuffer.allocate(measureBytes.length +
-          CarbonCommonConstants.INT_SIZE_IN_BYTE);
-      byteBuffer.putInt(measureBytes.length);
-      byteBuffer.put(measureBytes);
-      byteBuffer.flip();
-      measureBytes = byteBuffer.array();
-      measureHolder[customMeasureIndex[i]].setWritableByteArrayValueByIndex(count, measureBytes);
     }
   }
 
-  private NodeHolder createNodeHolderObjectWithOutKettle(byte[][] measureArray, byte[][] mdKeyArray,
-      byte[][][] noDictionaryArray, int entryCountLocal, byte[] startkeyLocal, byte[] endKeyLocal,
-      WriterCompressModel compressionModel, byte[][] noDictionaryStartKey,
-      byte[][] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
-      throws CarbonDataWriterException {
-    byte[][][] noDictionaryColumnsData = null;
-    List<ArrayList<byte[]>> colsAndValues = new ArrayList<ArrayList<byte[]>>();
-    int complexColCount = getComplexColsCount();
-
-    for (int i = 0; i < complexColCount; i++) {
-      colsAndValues.add(new ArrayList<byte[]>());
+  /**
+   * Represent a page data for all columns, we store its data in columnar layout, so that
+   * all processing apply to TablePage can be done in vectorized fashion.
+   */
+  class TablePage {
+
+    // For all dimension and measure columns, we store the column data directly in the page,
+    // the length of the page is the number of rows.
+
+    // TODO: we should have separate class for key columns so that keys are stored together in
+    // one vector to make it efficient for sorting
+    VarLengthColumnPage[] dictDimensionPage;
+    VarLengthColumnPage[] noDictDimensionPage;
+    ComplexColumnPage[] complexDimensionPage;
+    FixLengthColumnPage[] measurePage;
+
+    // the num of rows in this page, it must be less than short value (65536)
+    int pageSize;
+
+    TablePage(int pageSize) {
+      this.pageSize = pageSize;
+      dictDimensionPage = new VarLengthColumnPage[dimensionCount];
+      for (int i = 0; i < dictDimensionPage.length; i++) {
+        dictDimensionPage[i] = new VarLengthColumnPage(pageSize);
+      }
+      noDictDimensionPage = new VarLengthColumnPage[noDictionaryCount];
+      for (int i = 0; i < noDictDimensionPage.length; i++) {
+        noDictDimensionPage[i] = new VarLengthColumnPage(pageSize);
+      }
+      complexDimensionPage = new ComplexColumnPage[getComplexColumnCount()];
+      for (int i = 0; i < complexDimensionPage.length; i++) {
+        // here we still do not the depth of the complex column, it will be initialized when
+        // we get the first row.
+        complexDimensionPage[i] = null;
+      }
+      measurePage = new FixLengthColumnPage[measureCount];
+      for (int i = 0; i < measurePage.length; i++) {
+        measurePage[i] = new FixLengthColumnPage(type[i], pageSize);
+      }
     }
-    int noOfColumn = colGrpModel.getNoOfColumnStore();
-    DataHolder[] dataHolders = getDataHolders(noOfColumn, mdKeyArray.length);
-    for (int i = 0; i < mdKeyArray.length; i++) {
-      byte[][] splitKey = columnarSplitter.splitKey(mdKeyArray[i]);
 
-      for (int j = 0; j < splitKey.length; j++) {
-        dataHolders[j].addData(splitKey[j], i);
+    /**
+     * Add one row to the internal store, it will be converted into columnar layout
+     * @param rowId Id of the input row
+     * @param rows row object
+     */
+    void addRow(int rowId, Object[] rows) {
+
+      // convert dictionary columns
+      byte[] MDKey = (byte[]) rows[mdKeyIndex];
+      if (columnarSplitter != null) {
+        byte[][] splitKey = columnarSplitter.splitKey(MDKey);
+        for (int i = 0; i < splitKey.length; i++) {
+          dictDimensionPage[i].putByteArray(rowId, splitKey[i]);
+        }
       }
-    }
-    if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-      noDictionaryColumnsData = new byte[noDictionaryCount][noDictionaryArray.length][];
-      for (int i = 0; i < noDictionaryArray.length; i++) {
-        int complexColumnIndex = primitiveDimLens.length + noDictionaryCount;
-        byte[][] splitKey = noDictionaryArray[i];
-
-        int complexTypeIndex = 0;
-        for (int j = 0; j < splitKey.length; j++) {
-          //nodictionary Columns
-          if (j < noDictionaryCount) {
-            int keyLength = splitKey[j].length;
-            byte[] newKey = new byte[keyLength + 2];
-            ByteBuffer buffer = ByteBuffer.wrap(newKey);
-            buffer.putShort((short) keyLength);
-            System.arraycopy(splitKey[j], 0, newKey, 2, keyLength);
-            noDictionaryColumnsData[j][i] = newKey;
-          }
-          //complex types
-          else {
-            // Need to write columnar block from complex byte array
-            int index = complexColumnIndex - noDictionaryCount;
-            GenericDataType complexDataType = complexIndexMap.get(index);
-            complexColumnIndex++;
-            if (complexDataType != null) {
-              List<ArrayList<byte[]>> columnsArray = new ArrayList<ArrayList<byte[]>>();
-              for (int k = 0; k < complexDataType.getColsCount(); k++) {
-                columnsArray.add(new ArrayList<byte[]>());
-              }
-
-              try {
-                ByteBuffer byteArrayInput = ByteBuffer.wrap(splitKey[j]);
-                ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
-                DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
-                complexDataType
-                    .parseAndBitPack(byteArrayInput, dataOutputStream, this.complexKeyGenerator);
-                complexDataType.getColumnarDataForComplexType(columnsArray,
-                    ByteBuffer.wrap(byteArrayOutput.toByteArray()));
-                byteArrayOutput.close();
-              } catch (IOException e) {
-                throw new CarbonDataWriterException(
-                    "Problem while bit packing and writing complex datatype", e);
-              } catch (KeyGenException e) {
-                throw new CarbonDataWriterException(
-                    "Problem while bit packing and writing complex datatype", e);
-              }
-
-              for (ArrayList<byte[]> eachColumn : columnsArray) {
-                colsAndValues.get(complexTypeIndex++).addAll(eachColumn);
-              }
-            } else {
-              // This case not possible as ComplexType is the last columns
-            }
+
+      // convert noDictionary columns and complex columns.
+      if (noDictionaryCount > 0 || complexColCount > 0) {
+        byte[][] noDictAndComplex = (byte[][])(rows[mdKeyIndex - 1]);
+        for (int i = 0; i < noDictAndComplex.length; i++) {
+          if (i < noDictionaryCount) {
+            // noDictionary columns, since it is variable length, we need to prepare each
+            // element as LV encoded byte array (first two bytes are the length of the array)
+            byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
+            noDictDimensionPage[i].putByteArray(rowId, valueWithLength);
+          } else {
+            // complex columns
+            addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
           }
         }
       }
-    }
-    thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
-            CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL));
-    ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
-    List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
-        primitiveDimLens.length + noDictionaryCount + complexColCount);
-    int i = 0;
-    int dictionaryColumnCount = -1;
-    int noDictionaryColumnCount = -1;
-    boolean isSortColumn = false;
-    for (i = 0; i < dimensionType.length; i++) {
-      isSortColumn = i < segmentProperties.getNumberOfSortColumns();
-      if (dimensionType[i]) {
-        dictionaryColumnCount++;
-        if (colGrpModel.isColumnar(dictionaryColumnCount)) {
-          submit.add(executorService.submit(
-              new BlockSortThread(i, dataHolders[dictionaryColumnCount].getData(), isSortColumn,
-                  isUseInvertedIndex[i] & isSortColumn)));
-        } else {
-          submit.add(
-              executorService.submit(new ColGroupBlockStorage(dataHolders[dictionaryColumnCount])));
+
+      // convert measure columns
+      for (int i = 0; i < type.length; i++) {
+        Object value = rows[i];
+
+        // in compaction flow the measure with decimal type will come as spark decimal.
+        // need to convert it to byte array.
+        if (type[i] == DataType.DECIMAL && compactionFlow) {
+          BigDecimal bigDecimal = ((Decimal) rows[i]).toJavaBigDecimal();
+          value = DataTypeUtil.bigDecimalToByte(bigDecimal);
         }
-      } else {
-        submit.add(executorService.submit(
-            new BlockSortThread(i, noDictionaryColumnsData[++noDictionaryColumnCount], false, true,
-                isSortColumn, isUseInvertedIndex[i] & isSortColumn)));
+        measurePage[i].putData(rowId, value);
       }
     }
-    for (int k = 0; k < complexColCount; k++) {
-      submit.add(executorService.submit(new BlockSortThread(i++,
-          colsAndValues.get(k).toArray(new byte[colsAndValues.get(k).size()][]), false, true)));
-    }
-    executorService.shutdown();
-    try {
-      executorService.awaitTermination(1, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    IndexStorage[] blockStorage =
-        new IndexStorage[colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount];
-    try {
-      for (int k = 0; k < blockStorage.length; k++) {
-        blockStorage[k] = submit.get(k).get();
+
+    /**
+     * add a complex column into internal member compleDimensionPage
+     * @param index index of the complexDimensionPage
+     * @param rowId Id of the input row
+     * @param complexColumns byte array the complex columm to be added, extracted of input row
+     */
+    // TODO: this function should be refactoried, ColumnPage should support complex type encoding
+    // directly instead of doing it here
+    private void addComplexColumn(int index, int rowId, byte[] complexColumns) {
+      GenericDataType complexDataType = complexIndexMap.get(index + primitiveDimLens.length);
+
+      // initialize the page if first row
+      if (rowId == 0) {
+        int depthInComplexColumn = complexDataType.getColsCount();
+        complexDimensionPage[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
       }
-    } catch (Exception e) {
-      LOGGER.error(e, e.getMessage());
-    }
-    byte[] composedNonDictStartKey = null;
-    byte[] composedNonDictEndKey = null;
-
-    int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns();
-    // generate start/end key by sort_columns
-    if (numberOfDictSortColumns > 0) {
-      // if sort_columns contain dictionary columns
-      int[] keySize = columnarSplitter.getBlockKeySize();
-      if (keySize.length > numberOfDictSortColumns) {
-        int newMdkLength = 0;
-        for (int index = 0; index < numberOfDictSortColumns; index++) {
-          newMdkLength += keySize[index];
-        }
-        byte[] newStartKeyOfSortKey = new byte[newMdkLength];
-        byte[] newEndKeyOfSortKey = new byte[newMdkLength];
-        System.arraycopy(startkeyLocal, 0, newStartKeyOfSortKey, 0, newMdkLength);
-        System.arraycopy(endKeyLocal, 0, newEndKeyOfSortKey, 0, newMdkLength);
-        startkeyLocal = newStartKeyOfSortKey;
-        endKeyLocal = newEndKeyOfSortKey;
+
+      int depthInComplexColumn = complexDimensionPage[index].getDepth();
+      // this is the encoded columnar data which will be added to page,
+      // size of this list is the depth of complex column, we will fill it by input data
+      List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
+      for (int k = 0; k < depthInComplexColumn; k++) {
+        encodedComplexColumnar.add(new ArrayList<byte[]>());
+      }
+
+      // encode the complex type data and fill columnsArray
+      try {
+        ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
+        ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
+        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
+        complexDataType.parseAndBitPack(byteArrayInput, dataOutputStream, complexKeyGenerator);
+        complexDataType.getColumnarDataForComplexType(encodedComplexColumnar,
+            ByteBuffer.wrap(byteArrayOutput.toByteArray()));
+        byteArrayOutput.close();
+      } catch (IOException | KeyGenException e) {
+        throw new CarbonDataWriterException(
+            "Problem while bit packing and writing complex datatype", e);
       }
-    } else {
-      startkeyLocal = new byte[0];
-      endKeyLocal = new byte[0];
-    }
 
-    int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns();
-    if (numberOfNoDictSortColumns > 0) {
-      // if sort_columns contain no-dictionary columns
-      if (noDictionaryStartKey.length > numberOfNoDictSortColumns) {
-        byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
-        byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
-        System.arraycopy(noDictionaryStartKey, 0, newNoDictionaryStartKey, 0,
-            numberOfNoDictSortColumns);
-        System
-            .arraycopy(noDictionaryEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
-        noDictionaryStartKey = newNoDictionaryStartKey;
-        noDictionaryEndKey = newNoDictionaryEndKey;
+      for (int depth = 0; depth < depthInComplexColumn; depth++) {
+        complexDimensionPage[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
       }
-      composedNonDictStartKey =
-          NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryStartKey);
-      composedNonDictEndKey =
-          NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictionaryEndKey);
     }
-    return this.dataWriter
-        .buildDataNodeHolder(blockStorage, measureArray, entryCountLocal, startkeyLocal,
-            endKeyLocal, compressionModel, composedNonDictStartKey, composedNonDictEndKey,
-            nullValueIndexBitSet);
+
+    // Adds length as a short element (first 2 bytes) to the head of the input byte array
+    private byte[] addLengthToByteArray(byte[] input) {
+      byte[] output = new byte[input.length + 2];
+      ByteBuffer buffer = ByteBuffer.wrap(output);
+      buffer.putShort((short) input.length);
+      buffer.put(input, 0, input.length);
+      return output;
+    }
+
   }
 
   /**
-   * DataHolder will have all row mdkey data
-   *
-   * @param noOfColumn : no of column participated in mdkey
-   * @param noOfRow    : total no of row
-   * @return : dataholder
+   * generate the NodeHolder from the input rows (one page in case of V3 format)
    */
-  private DataHolder[] getDataHolders(int noOfColumn, int noOfRow) {
-    DataHolder[] dataHolders = new DataHolder[noOfColumn];
-    int colGrpId = -1;
-    for (int colGrp = 0; colGrp < noOfColumn; colGrp++) {
-      if (colGrpModel.isColumnar(colGrp)) {
-        dataHolders[colGrp] = new ColumnDataHolder(noOfRow);
-      } else {
-        ColGroupMinMax colGrpMinMax = new ColGroupMinMax(segmentProperties, ++colGrpId);
-        dataHolders[colGrp] =
-            new ColGroupDataHolder(this.columnarSplitter.getBlockKeySize()[colGrp], noOfRow,
-                colGrpMinMax);
-      }
+  private NodeHolder processDataRows(List<Object[]> dataRows)
+      throws CarbonDataWriterException {
+    if (dataRows.size() == 0) {
+      return new NodeHolder();
+    }
+    TablePage tablePage = new TablePage(dataRows.size());
+    IndexKey keys = new IndexKey(dataRows.size());
+    int rowId = 0;
+
+    // convert row to columnar data
+    for (Object[] row : dataRows) {
+      tablePage.addRow(rowId, row);
+      keys.update(rowId, row);
+      rowId++;
+    }
+
+    // encode and compress dimensions and measure
+    // TODO: To make the encoding more transparent to the user, user should be enable to specify
+    // the encoding and compression method for each type when creating table.
+
+    Codec codec = new Codec();
+    IndexStorage[] dimColumns = codec.encodeAndCompressDimensions(tablePage);
+    Codec encodedMeasure = codec.encodeAndCompressMeasures(tablePage);
+
+    // prepare nullBitSet for writer, remove this after writer can accept TablePage
+    BitSet[] nullBitSet = new BitSet[tablePage.measurePage.length];
+    for (int i = 0; i < nullBitSet.length; i++) {
+      nullBitSet[i] = tablePage.measurePage[i].getNullBitSet();
     }
-    return dataHolders;
+
+    LOGGER.info("Number Of records processed: " + dataRows.size());
+
+    // TODO: writer interface should be modified to use TablePage
+    return dataWriter.buildDataNodeHolder(dimColumns, encodedMeasure.getEncodedMeasure(),
+        dataRows.size(), keys.startKey, keys.endKey, encodedMeasure.getCompressionModel(),
+        keys.packedNoDictStartKey, keys.packedNoDictEndKey, nullBitSet);
   }
 
   /**
@@ -959,7 +788,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return count;
   }
 
-  private int getComplexColsCount() {
+  // return the number of complex column after complex columns are expanded
+  private int getExpandedComplexColsCount() {
     int count = 0;
     for (int i = 0; i < dimensionCount; i++) {
       GenericDataType complexDataType = complexIndexMap.get(i);
@@ -970,6 +800,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return count;
   }
 
+  // return the number of complex column
+  private int getComplexColumnCount() {
+    return complexIndexMap.size();
+  }
+
   /**
    * below method will be used to close the handler
    */
@@ -995,80 +830,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   }
 
   /**
-   * @param value
-   * @return it return no of value after decimal
-   */
-  private int getDecimalCount(double value) {
-    String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString();
-    int integerPlaces = strValue.indexOf('.');
-    int decimalPlaces = 0;
-    if (-1 != integerPlaces) {
-      decimalPlaces = strValue.length() - integerPlaces - 1;
-    }
-    return decimalPlaces;
-  }
-
-  /**
-   * This method will be used to update the max value for each measure
-   */
-  private void calculateMaxMin(Object[] max, Object[] min, int[] decimal, int[] msrIndex,
-      Object[] row) {
-    // Update row level min max
-    for (int i = 0; i < msrIndex.length; i++) {
-      int count = msrIndex[i];
-      if (row[count] != null) {
-        if (type[count] == CarbonCommonConstants.DOUBLE_MEASURE) {
-          double value = (double) row[count];
-          double maxVal = (double) max[count];
-          double minVal = (double) min[count];
-          max[count] = (maxVal > value ? max[count] : value);
-          min[count] = (minVal < value ? min[count] : value);
-          int num = getDecimalCount(value);
-          decimal[count] = (decimal[count] > num ? decimal[count] : num);
-        } else if (type[count] == CarbonCommonConstants.BIG_INT_MEASURE) {
-          long value = (long) row[count];
-          long maxVal = (long) max[count];
-          long minVal = (long) min[count];
-          max[count] = (maxVal > value ? max[count] : value);
-          min[count] = (minVal < value ? min[count] : value);
-        } else if (type[count] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          byte[] buff = null;
-          // in compaction flow the measure with decimal type will come as spark decimal.
-          // need to convert it to byte array.
-          if (this.compactionFlow) {
-            BigDecimal bigDecimal = ((Decimal) row[count]).toJavaBigDecimal();
-            buff = DataTypeUtil.bigDecimalToByte(bigDecimal);
-          } else {
-            buff = (byte[]) row[count];
-          }
-          BigDecimal value = DataTypeUtil.byteToBigDecimal(buff);
-          decimal[count] = value.scale();
-        }
-      }
-    }
-  }
-
-  /**
-   * This method will calculate the unique value which will be used as storage
-   * key for null values of measures
-   *
-   * @param minValue
-   * @param uniqueValue
-   */
-  private void calculateUniqueValue(Object[] minValue, Object[] uniqueValue) {
-    for (int i = 0; i < measureCount; i++) {
-      if (type[i] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        uniqueValue[i] = (long) minValue[i] - 1;
-      } else if (type[i] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        BigDecimal val = (BigDecimal) minValue[i];
-        uniqueValue[i] = (val.subtract(new BigDecimal(1.0)));
-      } else {
-        uniqueValue[i] = (double) minValue[i] - 1;
-      }
-    }
-  }
-
-  /**
    * Below method will be to configure fact file writing configuration
    *
    * @throws CarbonDataWriterException
@@ -1122,15 +883,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     List<Integer> customMeasureIndexList =
         new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (int j = 0; j < type.length; j++) {
-      if (type[j] != CarbonCommonConstants.BYTE_VALUE_MEASURE
-          && type[j] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+      if (type[j] != DataType.BYTE && type[j] != DataType.DECIMAL) {
         otherMeasureIndexList.add(j);
       } else {
         customMeasureIndexList.add(j);
       }
     }
-    otherMeasureIndex = new int[otherMeasureIndexList.size()];
-    customMeasureIndex = new int[customMeasureIndexList.size()];
+
+    int[] otherMeasureIndex = new int[otherMeasureIndexList.size()];
+    int[] customMeasureIndex = new int[customMeasureIndexList.size()];
     for (int i = 0; i < otherMeasureIndex.length; i++) {
       otherMeasureIndex[i] = otherMeasureIndexList.get(i);
     }
@@ -1157,7 +918,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * @return all dimensions cardinality including complex dimension metadata column
    */
   private int[] getBlockKeySizeWithComplexTypes(int[] primitiveBlockKeySize) {
-    int allColsCount = getComplexColsCount();
+    int allColsCount = getExpandedComplexColsCount();
     int[] blockKeySizeWithComplexTypes =
         new int[this.colGrpModel.getNoOfColumnStore() + allColsCount];
 
@@ -1178,52 +939,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     return blockKeySizeWithComplexTypes;
   }
 
-  private CarbonWriteDataHolder initialiseKeyBlockHolder(int size) {
-    CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
-    keyDataHolder.initialiseByteArrayValuesForKey(size);
-    return keyDataHolder;
-  }
-
-  private CarbonWriteDataHolder initialiseKeyBlockHolderForNonDictionary(int size) {
-    CarbonWriteDataHolder keyDataHolder = new CarbonWriteDataHolder();
-    keyDataHolder.initialiseByteArrayValuesForNonDictionary(size);
-    return keyDataHolder;
-  }
-
-  private CarbonWriteDataHolder[] initialiseDataHolder(int size) {
-    CarbonWriteDataHolder[] dataHolder = new CarbonWriteDataHolder[this.measureCount];
-    for (int i = 0; i < otherMeasureIndex.length; i++) {
-      dataHolder[otherMeasureIndex[i]] = new CarbonWriteDataHolder();
-      if (type[otherMeasureIndex[i]] == CarbonCommonConstants.BIG_INT_MEASURE) {
-        dataHolder[otherMeasureIndex[i]].initialiseLongValues(size);
-      } else {
-        dataHolder[otherMeasureIndex[i]].initialiseDoubleValues(size);
-      }
-    }
-    for (int i = 0; i < customMeasureIndex.length; i++) {
-      dataHolder[customMeasureIndex[i]] = new CarbonWriteDataHolder();
-      dataHolder[customMeasureIndex[i]].initialiseByteArrayValues(size);
-    }
-    return dataHolder;
-  }
-
-  /**
-   * Below method will be used to get the bit set array for
-   * all the measure, which will store the indexes which are null
-   *
-   * @param measureCount
-   * @return bit set to store null value index
-   */
-  private BitSet[] getMeasureNullValueIndexBitSet(int measureCount) {
-    // creating a bit set for all the measure column
-    BitSet[] nullvalueIndexBitset = new BitSet[measureCount];
-    for (int i = 0; i < nullvalueIndexBitset.length; i++) {
-      // bitset size will be blocklet size
-      nullvalueIndexBitset[i] = new BitSet(this.blockletSize);
-    }
-    return nullvalueIndexBitset;
-  }
-
   /**
    * Below method will be used to get the fact data writer instance
    *
@@ -1255,7 +970,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     carbonDataWriterVo.setCarbonDataFileAttributes(carbonDataFileAttributes);
     carbonDataWriterVo.setDatabaseName(databaseName);
     carbonDataWriterVo.setWrapperColumnSchemaList(wrapperColumnSchemaList);
-    carbonDataWriterVo.setIsDictionaryColumn(dimensionType);
+    carbonDataWriterVo.setIsDictionaryColumn(isDictDimension);
     carbonDataWriterVo.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     carbonDataWriterVo.setColCardinality(colCardinality);
     carbonDataWriterVo.setSegmentProperties(segmentProperties);
@@ -1482,13 +1197,185 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         }
       } else {
         if (version == ColumnarFormatVersion.V3) {
-          return new BlockIndexerStorageForNoInvertedIndexForShort(this.data, isNoDictionary);
+          return new BlockIndexerStorageForNoInvertedIndexForShort(this.data);
         } else {
-          return new BlockIndexerStorageForNoInvertedIndex(this.data, isNoDictionary);
+          return new BlockIndexerStorageForNoInvertedIndex(this.data);
         }
       }
 
     }
 
   }
+
+  public class Codec {
+    private WriterCompressModel compressionModel;
+    private byte[][] encodedMeasureArray;
+
+    Codec() {
+    }
+
+    public WriterCompressModel getCompressionModel() {
+      return compressionModel;
+    }
+
+    public byte[][] getEncodedMeasure() {
+      return encodedMeasureArray;
+    }
+
+    public Codec encodeAndCompressMeasures(TablePage tablePage) {
+      // TODO: following conversion is required only because compress model requires them,
+      // remove then after the compress framework is refactoried
+      FixLengthColumnPage[] measurePage = tablePage.measurePage;
+      int measureCount = measurePage.length;
+      Object[] min = new Object[measurePage.length];
+      Object[] max = new Object[measurePage.length];
+      Object[] uniqueValue = new Object[measurePage.length];
+      int[] decimal = new int[measurePage.length];
+      for (int i = 0; i < measurePage.length; i++) {
+        min[i] = measurePage[i].getStatistics().getMin();
+        max[i] = measurePage[i].getStatistics().getMax();
+        uniqueValue[i] = measurePage[i].getStatistics().getUniqueValue();
+        decimal[i] = measurePage[i].getStatistics().getDecimal();
+      }
+      // encode and compress measure column page
+      compressionModel = ValueCompressionUtil
+          .getWriterCompressModel(max, min, decimal, uniqueValue, type, new byte[measureCount]);
+      encodedMeasureArray = encodeMeasure(compressionModel, measurePage);
+      return this;
+    }
+
+    // this method first invokes encoding routine to encode the data chunk,
+    // followed by invoking compression routine for preparing the data chunk for writing.
+    private byte[][] encodeMeasure(WriterCompressModel compressionModel,
+        FixLengthColumnPage[] columnPages) {
+
+      CarbonWriteDataHolder[] holders = new CarbonWriteDataHolder[columnPages.length];
+      for (int i = 0; i < holders.length; i++) {
+        holders[i] = new CarbonWriteDataHolder();
+        switch (columnPages[i].getDataType()) {
+          case SHORT:
+          case INT:
+          case LONG:
+            holders[i].setWritableLongPage(columnPages[i].getLongPage());
+            break;
+          case DOUBLE:
+            holders[i].setWritableDoublePage(columnPages[i].getDoublePage());
+            break;
+          case DECIMAL:
+            holders[i].setWritableDecimalPage(columnPages[i].getDecimalPage());
+            break;
+          default:
+            throw new RuntimeException("Unsupported data type: " + columnPages[i].getDataType());
+        }
+      }
+
+      DataType[] dataType = compressionModel.getDataType();
+      ValueCompressionHolder[] values =
+          new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
+      byte[][] returnValue = new byte[values.length][];
+      for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
+        values[i] = compressionModel.getValueCompressionHolder()[i];
+        if (dataType[i] != DataType.DECIMAL) {
+          values[i].setValue(
+              ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
+                  .getCompressedValues(compressionModel.getCompressionFinders()[i], holders[i],
+                      compressionModel.getMaxValue()[i],
+                      compressionModel.getMantissa()[i]));
+        } else {
+          values[i].setValue(holders[i].getWritableByteArrayValues());
+        }
+        values[i].compress();
+        returnValue[i] = values[i].getCompressedData();
+      }
+
+      return returnValue;
+    }
+
+    /**
+     * Encode and compress each column page. The work is done using a thread pool.
+     */
+    private IndexStorage[] encodeAndCompressDimensions(TablePage tablePage) {
+      int noDictionaryCount = tablePage.noDictDimensionPage.length;
+      int complexColCount = tablePage.complexDimensionPage.length;
+
+      // thread pool size to be used for encoding dimension
+      // each thread will sort the column page data and compress it
+      int thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
+              CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL));
+      ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
+      Callable<IndexStorage> callable;
+      List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
+          primitiveDimLens.length + noDictionaryCount + complexColCount);
+      int i = 0;
+      int dictionaryColumnCount = -1;
+      int noDictionaryColumnCount = -1;
+      int colGrpId = -1;
+      boolean isSortColumn = false;
+      for (i = 0; i < isDictDimension.length; i++) {
+        isSortColumn = i < segmentProperties.getNumberOfSortColumns();
+        if (isDictDimension[i]) {
+          dictionaryColumnCount++;
+          if (colGrpModel.isColumnar(dictionaryColumnCount)) {
+            // dictionary dimension
+            callable =
+                new BlockSortThread(
+                    i,
+                    tablePage.dictDimensionPage[dictionaryColumnCount].getByteArrayPage(),
+                    isSortColumn,
+                    isUseInvertedIndex[i] & isSortColumn);
+
+          } else {
+            // column group
+            callable = new ColGroupBlockStorage(
+                segmentProperties,
+                ++colGrpId,
+                tablePage.dictDimensionPage[dictionaryColumnCount].getByteArrayPage());
+          }
+        } else {
+          // no dictionary dimension
+          callable =
+              new BlockSortThread(
+                  i,
+                  tablePage.noDictDimensionPage[++noDictionaryColumnCount].getByteArrayPage(),
+                  false,
+                  true,
+                  isSortColumn,
+                  isUseInvertedIndex[i] & isSortColumn);
+        }
+        // start a thread to sort the page data
+        submit.add(executorService.submit(callable));
+      }
+
+      // complex type column
+      for (int index = 0; index < getComplexColumnCount(); index++) {
+        Iterator<byte[][]> iterator = tablePage.complexDimensionPage[index].iterator();
+        while (iterator.hasNext()) {
+          callable =
+              new BlockSortThread(
+                  i++,
+                  iterator.next(),
+                  false,
+                  true);
+          submit.add(executorService.submit(callable));
+        }
+      }
+      executorService.shutdown();
+      try {
+        executorService.awaitTermination(1, TimeUnit.DAYS);
+      } catch (InterruptedException e) {
+        LOGGER.error(e, e.getMessage());
+      }
+      IndexStorage[] dimColumns = new IndexStorage[
+          colGrpModel.getNoOfColumnStore() + noDictionaryCount + getExpandedComplexColsCount()];
+      try {
+        for (int k = 0; k < dimColumns.length; k++) {
+          dimColumns[k] = submit.get(k).get();
+        }
+      } catch (Exception e) {
+        LOGGER.error(e, e.getMessage());
+      }
+      return dimColumns;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/98df130a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 6fd29d7..44958a4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -18,7 +18,6 @@
 package org.apache.carbondata.processing.store;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -29,13 +28,13 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
@@ -146,10 +145,9 @@ public class CarbonFactDataHandlerModel {
   private int[] primitiveDimLens;
 
   /**
-   * array in which each character represents an aggregation type and
-   * the array length will be equal to the number of measures in table
+   * data type of all measures in the table
    */
-  private char[] aggType;
+  private DataType[] measureDataType;
   /**
    * carbon data file attributes like task id, file stamp
    */
@@ -283,8 +281,8 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
     carbonFactDataHandlerModel.setColCardinality(colCardinality);
     carbonFactDataHandlerModel.setDataWritingRequest(true);
-    carbonFactDataHandlerModel.setAggType(CarbonDataProcessorUtil
-        .getAggType(configuration.getMeasureCount(), configuration.getMeasureFields()));
+    carbonFactDataHandlerModel.setMeasureDataType(CarbonDataProcessorUtil
+        .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields()));
     carbonFactDataHandlerModel.setFactDimLens(dimLens);
     carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
     carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen);
@@ -340,13 +338,12 @@ public class CarbonFactDataHandlerModel {
         new HashMap<Integer, GenericDataType>(segmentProperties.getComplexDimensions().size());
     carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
     carbonFactDataHandlerModel.setDataWritingRequest(true);
-    char[] aggType = new char[segmentProperties.getMeasures().size()];
-    Arrays.fill(aggType, 'n');
+    DataType[] aggType = new DataType[segmentProperties.getMeasures().size()];
     int i = 0;
     for (CarbonMeasure msr : segmentProperties.getMeasures()) {
-      aggType[i++] = DataTypeUtil.getAggType(msr.getDataType());
+      aggType[i++] = msr.getDataType();
     }
-    carbonFactDataHandlerModel.setAggType(aggType);
+    carbonFactDataHandlerModel.setMeasureDataType(aggType);
     carbonFactDataHandlerModel.setFactDimLens(segmentProperties.getDimColumnsCardinality());
     String carbonDataDirectoryPath = CarbonDataProcessorUtil
         .checkAndCreateCarbonStoreLocation(loadModel.getStorePath(), loadModel.getDatabaseName(),
@@ -501,12 +498,12 @@ public class CarbonFactDataHandlerModel {
     this.primitiveDimLens = primitiveDimLens;
   }
 
-  public char[] getAggType() {
-    return aggType;
+  public DataType[] getMeasureDataType() {
+    return measureDataType;
   }
 
-  public void setAggType(char[] aggType) {
-    this.aggType = aggType;
+  public void setMeasureDataType(DataType[] measureDataType) {
+    this.measureDataType = measureDataType;
   }
 
   public String getCarbonDataDirectoryPath() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/98df130a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index f8454f1..6a33f34 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortTempFileChunkHolder;
@@ -93,7 +94,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    */
   private String tempFileLocation;
 
-  private char[] aggType;
+  private DataType[] measureDataType;
 
   /**
    * below code is to check whether dimension
@@ -105,13 +106,13 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
 
   public SingleThreadFinalSortFilesMerger(String tempFileLocation, String tableName,
       int dimensionCount, int complexDimensionCount, int measureCount, int noDictionaryCount,
-      char[] aggType, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
+      DataType[] type, boolean[] isNoDictionaryColumn, boolean[] isNoDictionarySortColumn) {
     this.tempFileLocation = tempFileLocation;
     this.tableName = tableName;
     this.dimensionCount = dimensionCount;
     this.complexDimensionCount = complexDimensionCount;
     this.measureCount = measureCount;
-    this.aggType = aggType;
+    this.measureDataType = type;
     this.noDictionaryCount = noDictionaryCount;
     this.isNoDictionaryColumn = isNoDictionaryColumn;
     this.isNoDictionarySortColumn = isNoDictionarySortColumn;
@@ -183,8 +184,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
           // create chunk holder
           SortTempFileChunkHolder sortTempFileChunkHolder =
               new SortTempFileChunkHolder(tempFile, dimensionCount, complexDimensionCount,
-                  measureCount, fileBufferSize, noDictionaryCount, aggType, isNoDictionaryColumn,
-                  isNoDictionarySortColumn);
+                  measureCount, fileBufferSize, noDictionaryCount, measureDataType,
+                  isNoDictionaryColumn, isNoDictionarySortColumn);
 
           // initialize
           sortTempFileChunkHolder.initialize();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/98df130a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
index 2049bec..def2aaa 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
@@ -14,10 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.carbondata.processing.store.colgroup;
 
 import java.util.concurrent.Callable;
 
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 
 /**
@@ -25,8 +27,22 @@ import org.apache.carbondata.core.datastore.columnar.IndexStorage;
  */
 public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage> {
 
+  private byte[][] data;
+
+  private ColGroupMinMax colGrpMinMax;
+
+  public ColGroupBlockStorage(SegmentProperties segmentProperties, int colGrpIndex, byte[][] data) {
+    colGrpMinMax = new ColGroupMinMax(segmentProperties, colGrpIndex);
+    this.data = data;
+    for (int i = 0; i < data.length; i++) {
+      colGrpMinMax.add(data[i]);
+    }
+  }
+
+  @Deprecated
   private ColGroupDataHolder colGrpDataHolder;
 
+  @Deprecated
   public ColGroupBlockStorage(DataHolder colGrpDataHolder) {
     this.colGrpDataHolder = (ColGroupDataHolder) colGrpDataHolder;
   }
@@ -58,7 +74,7 @@ public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage
    * for column group storage its not required
    */
   @Override public byte[][] getKeyBlock() {
-    return colGrpDataHolder.getData();
+    return data;
   }
 
   /**
@@ -73,22 +89,19 @@ public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage
    * for column group storage its not required
    */
   @Override public int getTotalSize() {
-    return colGrpDataHolder.getTotalSize();
+    return data.length;
   }
 
-  /**
-   * Get min max of column group storage
-   */
   @Override public byte[] getMin() {
-    return colGrpDataHolder.getMin();
+    return colGrpMinMax.getMin();
   }
 
   @Override public byte[] getMax() {
-    return colGrpDataHolder.getMax();
+    return colGrpMinMax.getMax();
   }
 
   /**
-   * return
+   * return self
    */
   @Override public IndexStorage call() throws Exception {
     return this;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/98df130a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index bb80d1e..18f1b2e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -105,10 +105,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
     int[] keyLengths = new int[keyStorageArray.length];
 
     // below will calculate min and max value for each column
-    // for below 2d array, first index will be for column and second will be min
-    // max
+    // for below 2d array, first index will be for column and second will be min and max
     // value for same column
-    // byte[][] columnMinMaxData = new byte[keyStorageArray.length][];
 
     byte[][] dimensionMinValue = new byte[keyStorageArray.length][];
     byte[][] dimensionMaxValue = new byte[keyStorageArray.length][];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/98df130a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index c6eeb4c..3c090fe 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.util;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -48,7 +47,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
@@ -406,31 +404,28 @@ public final class CarbonDataProcessorUtil {
     return columnNames;
   }
 
-  /**
-   * get agg type
-   */
-  public static char[] getAggType(int measureCount, String databaseName, String tableName) {
-    char[] aggType = new char[measureCount];
-    Arrays.fill(aggType, 'n');
+
+  public static DataType[] getMeasureDataType(int measureCount, String databaseName,
+      String tableName) {
+    DataType[] type = new DataType[measureCount];
+    for (int i = 0; i < type.length; i++) {
+      type[i] = DataType.DOUBLE;
+    }
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
         databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
     List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName);
-    for (int i = 0; i < aggType.length; i++) {
-      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
+    for (int i = 0; i < type.length; i++) {
+      type[i] = measures.get(i).getDataType();
     }
-    return aggType;
+    return type;
   }
 
-  /**
-   * get agg type
-   */
-  public static char[] getAggType(int measureCount, DataField[] measureFields) {
-    char[] aggType = new char[measureCount];
-    Arrays.fill(aggType, 'n');
-    for (int i = 0; i < measureFields.length; i++) {
-      aggType[i] = DataTypeUtil.getAggType(measureFields[i].getColumn().getDataType());
+  public static DataType[] getMeasureDataType(int measureCount, DataField[] measureFields) {
+    DataType[] type = new DataType[measureCount];
+    for (int i = 0; i < type.length; i++) {
+      type[i] = measureFields[i].getColumn().getDataType();
     }
-    return aggType;
+    return type;
   }
 
   /**
@@ -509,16 +504,19 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
-   * initialise aggregation type for measures for their storage format
+   * initialise data type for measures for their storage format
    */
-  public static char[] initAggType(CarbonTable carbonTable, String tableName, int measureCount) {
-    char[] aggType = new char[measureCount];
-    Arrays.fill(aggType, 'n');
+  public static DataType[] initDataType(CarbonTable carbonTable, String tableName,
+      int measureCount) {
+    DataType[] type = new DataType[measureCount];
+    for (int i = 0; i < type.length; i++) {
+      type[i] = DataType.DOUBLE;
+    }
     List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName);
     for (int i = 0; i < measureCount; i++) {
-      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
+      type[i] = measures.get(i).getDataType();
     }
-    return aggType;
+    return type;
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/98df130a/processing/src/test/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java b/processing/src/test/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
index da10d7a..038ac03 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/store/colgroup/ColGroupMinMaxTest.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 /**
@@ -159,6 +160,7 @@ public class ColGroupMinMaxTest {
 		}
 	}
 
+	@Ignore
 	@Test
 	public void testRowStoreMinMax() throws KeyGenException {