You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/10 03:46:38 UTC

[2/6] carbondata git commit: extract interface

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index edc7ece..53d5dcd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -19,8 +19,6 @@ package org.apache.carbondata.processing.store;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -36,17 +34,10 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.datastore.page.FixLengthColumnPage;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
 import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator;
@@ -57,17 +48,11 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.NodeHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
-import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
 import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.store.writer.Encoder;
 
 /**
  * Fact data handler class to handle the fact data
@@ -108,7 +93,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * keyBlockHolder
    */
   private CarbonKeyBlockHolder[] keyBlockHolder;
-  private boolean[] aggKeyBlock;
+
+  // This variable is true if it is dictionary dimension and its cardinality is lower than
+  // property of CarbonCommonConstants.HIGH_CARDINALITY_VALUE
+  // It decides whether it will do RLE encoding on data page for this dimension
+  private boolean[] rleEncodingForDictDimension;
   private boolean[] isNoDictionary;
   private long processedDataCount;
   private ExecutorService producerExecutorService;
@@ -117,7 +106,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private List<Future<Void>> consumerExecutorServiceTaskList;
   private List<CarbonRow> dataRows;
   private ColumnGroupModel colGrpModel;
-  private boolean[] isUseInvertedIndex;
   /**
    * semaphore which will used for managing node holder objects
    */
@@ -150,15 +138,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private boolean[] isDictDimension;
 
-  private int bucketNumber;
-
-  private int taskExtension;
-
   /**
    * current data format version
    */
   private ColumnarFormatVersion version;
 
+  private DefaultEncoder encoder;
+
   /**
    * CarbonFactDataHandler constructor
    */
@@ -168,20 +154,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
     int numDimColumns = colGrpModel.getNoOfColumnStore() + model.getNoDictionaryCount()
         + getExpandedComplexColsCount();
-    this.aggKeyBlock = new boolean[numDimColumns];
+    this.rleEncodingForDictDimension = new boolean[numDimColumns];
     this.isNoDictionary = new boolean[numDimColumns];
-    this.bucketNumber = model.getBucketId();
-    this.taskExtension = model.getTaskExtension();
-    this.isUseInvertedIndex = new boolean[numDimColumns];
-    if (null != model.getIsUseInvertedIndex()) {
-      for (int i = 0; i < isUseInvertedIndex.length; i++) {
-        if (i < model.getIsUseInvertedIndex().length) {
-          isUseInvertedIndex[i] = model.getIsUseInvertedIndex()[i];
-        } else {
-          isUseInvertedIndex[i] = true;
-        }
-      }
-    }
+
     int noDictStartIndex = this.colGrpModel.getNoOfColumnStore();
     // setting true value for dims of high card
     for (int i = 0; i < model.getNoDictionaryCount(); i++) {
@@ -198,37 +173,37 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
               CarbonCommonConstants.HIGH_CARDINALITY_VALUE,
               CarbonCommonConstants.HIGH_CARDINALITY_VALUE_DEFAULTVALUE));
       int[] columnSplits = colGrpModel.getColumnSplit();
-      int dimCardinalityIndex = 0;
-      int aggIndex = 0;
+      int dimCardinalityIndex = -1;
+      int aggIndex = -1;
       int[] dimLens = model.getSegmentProperties().getDimColumnsCardinality();
       for (int i = 0; i < columnSplits.length; i++) {
-        if (colGrpModel.isColumnar(i) && dimLens[dimCardinalityIndex] < noDictionaryValue) {
-          this.aggKeyBlock[aggIndex++] = true;
-          continue;
-        }
         dimCardinalityIndex += columnSplits[i];
         aggIndex++;
+        if (colGrpModel.isColumnar(i) && dimLens[dimCardinalityIndex] < noDictionaryValue) {
+          this.rleEncodingForDictDimension[aggIndex] = true;
+        }
       }
 
       if (model.getDimensionCount() < dimLens.length) {
         int allColsCount = getColsCount(model.getDimensionCount());
-        List<Boolean> aggKeyBlockWithComplex = new ArrayList<Boolean>(allColsCount);
+        List<Boolean> rleWithComplex = new ArrayList<Boolean>(allColsCount);
         for (int i = 0; i < model.getDimensionCount(); i++) {
           GenericDataType complexDataType = model.getComplexIndexMap().get(i);
           if (complexDataType != null) {
-            complexDataType.fillAggKeyBlock(aggKeyBlockWithComplex, this.aggKeyBlock);
+            complexDataType.fillAggKeyBlock(rleWithComplex, this.rleEncodingForDictDimension);
           } else {
-            aggKeyBlockWithComplex.add(this.aggKeyBlock[i]);
+            rleWithComplex.add(this.rleEncodingForDictDimension[i]);
           }
         }
-        this.aggKeyBlock = new boolean[allColsCount];
+        this.rleEncodingForDictDimension = new boolean[allColsCount];
         for (int i = 0; i < allColsCount; i++) {
-          this.aggKeyBlock[i] = aggKeyBlockWithComplex.get(i);
+          this.rleEncodingForDictDimension[i] = rleWithComplex.get(i);
         }
       }
-      aggKeyBlock = arrangeUniqueBlockType(aggKeyBlock);
+      rleEncodingForDictDimension = arrangeUniqueBlockType(rleEncodingForDictDimension);
     }
-    version = CarbonProperties.getInstance().getFormatVersion();
+    this.version = CarbonProperties.getInstance().getFormatVersion();
+    this.encoder = new DefaultEncoder(model);
   }
 
   private void initParameters(CarbonFactDataHandlerModel model) {
@@ -357,87 +332,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
   }
 
-  class IndexKey {
-    private int pageSize;
-    byte[] currentMDKey = null;
-    byte[][] currentNoDictionaryKey = null;
-    byte[] startKey = null;
-    byte[] endKey = null;
-    byte[][] noDictStartKey = null;
-    byte[][] noDictEndKey = null;
-    byte[] packedNoDictStartKey = null;
-    byte[] packedNoDictEndKey = null;
-
-    IndexKey(int pageSize) {
-      this.pageSize = pageSize;
-    }
-
-    /** update all keys based on the input row */
-    void update(int rowId, CarbonRow row) throws KeyGenException {
-      currentMDKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
-      if (model.getNoDictionaryCount() > 0 || model.getComplexIndexMap().size() > 0) {
-        currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row);
-      }
-      if (rowId == 0) {
-        startKey = currentMDKey;
-        noDictStartKey = currentNoDictionaryKey;
-      }
-      endKey = currentMDKey;
-      noDictEndKey = currentNoDictionaryKey;
-      if (rowId == pageSize - 1) {
-        finalizeKeys();
-      }
-    }
-
-    /** update all keys if SORT_COLUMNS option is used when creating table */
-    private void finalizeKeys() {
-      // If SORT_COLUMNS is used, may need to update start/end keys since the they may
-      // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
-      // start/end key
-      int numberOfDictSortColumns = model.getSegmentProperties().getNumberOfDictSortColumns();
-      if (numberOfDictSortColumns > 0) {
-        // if SORT_COLUMNS contain dictionary columns
-        int[] keySize = columnarSplitter.getBlockKeySize();
-        if (keySize.length > numberOfDictSortColumns) {
-          // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here
-          int newMdkLength = 0;
-          for (int i = 0; i < numberOfDictSortColumns; i++) {
-            newMdkLength += keySize[i];
-          }
-          byte[] newStartKeyOfSortKey = new byte[newMdkLength];
-          byte[] newEndKeyOfSortKey = new byte[newMdkLength];
-          System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength);
-          System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength);
-          startKey = newStartKeyOfSortKey;
-          endKey = newEndKeyOfSortKey;
-        }
-      } else {
-        startKey = new byte[0];
-        endKey = new byte[0];
-      }
-
-      // Do the same update for noDictionary start/end Key
-      int numberOfNoDictSortColumns = model.getSegmentProperties().getNumberOfNoDictSortColumns();
-      if (numberOfNoDictSortColumns > 0) {
-        // if sort_columns contain no-dictionary columns
-        if (noDictStartKey.length > numberOfNoDictSortColumns) {
-          byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
-          byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
-          System.arraycopy(
-              noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns);
-          System.arraycopy(
-              noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
-          noDictStartKey = newNoDictionaryStartKey;
-          noDictEndKey = newNoDictionaryEndKey;
-        }
-        packedNoDictStartKey =
-            NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey);
-        packedNoDictEndKey =
-            NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey);
-      }
-    }
-  }
-
   /**
    * generate the NodeHolder from the input rows (one page in case of V3 format)
    */
@@ -447,7 +341,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       return new NodeHolder();
     }
     TablePage tablePage = new TablePage(model, dataRows.size());
-    IndexKey keys = new IndexKey(dataRows.size());
+    TablePageKey keys = new TablePageKey(model, dataRows.size());
     int rowId = 0;
 
     // convert row to columnar data
@@ -458,26 +352,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
 
     // encode and compress dimensions and measure
-    // TODO: To make the encoding more transparent to the user, user should be enable to specify
-    // the encoding and compression method for each type when creating table.
-
-    Codec codec = new Codec(model.getMeasureDataType());
-    IndexStorage[] dimColumns = codec.encodeAndCompressDimensions(tablePage);
-    Codec encodedMeasure = codec.encodeAndCompressMeasures(tablePage);
-
-    // prepare nullBitSet for writer, remove this after writer can accept TablePage
-    BitSet[] nullBitSet = new BitSet[tablePage.getMeasurePage().length];
-    FixLengthColumnPage[] measurePages = tablePage.getMeasurePage();
-    for (int i = 0; i < nullBitSet.length; i++) {
-      nullBitSet[i] = measurePages[i].getNullBitSet();
-    }
+    Encoder.EncodedData encodedData = encoder.encode(tablePage);
+
+    TablePageStatistics tablePageStatistics = new TablePageStatistics(
+        model.getTableSpec(), tablePage, encodedData, tablePage.getMeasureStats());
 
     LOGGER.info("Number Of records processed: " + dataRows.size());
 
     // TODO: writer interface should be modified to use TablePage
-    return dataWriter.buildDataNodeHolder(dimColumns, encodedMeasure.getEncodedMeasure(),
-        dataRows.size(), keys.startKey, keys.endKey, encodedMeasure.getCompressionModel(),
-        keys.packedNoDictStartKey, keys.packedNoDictEndKey, nullBitSet);
+    return dataWriter.buildDataNodeHolder(encodedData, tablePageStatistics, keys);
   }
 
   /**
@@ -553,15 +436,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
   // return the number of complex column after complex columns are expanded
   private int getExpandedComplexColsCount() {
-    int count = 0;
-    int dictDimensionCount = model.getDimensionCount();
-    for (int i = 0; i < dictDimensionCount; i++) {
-      GenericDataType complexDataType = model.getComplexIndexMap().get(i);
-      if (complexDataType != null) {
-        count += complexDataType.getColsCount();
-      }
-    }
-    return count;
+    return model.getExpandedComplexColsCount();
   }
 
   // return the number of complex column
@@ -722,9 +597,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
     carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
     carbonDataWriterVo.setTableName(model.getTableName());
-    carbonDataWriterVo.setKeyBlockSize(keyBlockSize);
     carbonDataWriterVo.setFileManager(fileManager);
-    carbonDataWriterVo.setAggBlocks(aggKeyBlock);
+    carbonDataWriterVo.setRleEncodingForDictDim(rleEncodingForDictDimension);
     carbonDataWriterVo.setIsComplexType(isComplexTypes());
     carbonDataWriterVo.setNoDictionaryCount(model.getNoDictionaryCount());
     carbonDataWriterVo.setCarbonDataFileAttributes(model.getCarbonDataFileAttributes());
@@ -735,8 +609,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     carbonDataWriterVo.setColCardinality(model.getColCardinality());
     carbonDataWriterVo.setSegmentProperties(model.getSegmentProperties());
     carbonDataWriterVo.setTableBlocksize(model.getBlockSizeInMB());
-    carbonDataWriterVo.setBucketNumber(bucketNumber);
-    carbonDataWriterVo.setTaskExtension(taskExtension);
+    carbonDataWriterVo.setBucketNumber(model.getBucketId());
+    carbonDataWriterVo.setTaskExtension(model.getTaskExtension());
     carbonDataWriterVo.setSchemaUpdatedTimeStamp(model.getSchemaUpdatedTimeStamp());
     return carbonDataWriterVo;
   }
@@ -917,233 +791,4 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       return null;
     }
   }
-
-  private final class BlockSortThread implements Callable<IndexStorage> {
-    private int index;
-
-    private byte[][] data;
-    private boolean isSortRequired;
-    private boolean isCompressionReq;
-    private boolean isUseInvertedIndex;
-
-    private boolean isNoDictionary;
-
-    private BlockSortThread(int index, byte[][] data, boolean isSortRequired,
-        boolean isUseInvertedIndex) {
-      this.index = index;
-      this.data = data;
-      isCompressionReq = aggKeyBlock[this.index];
-      this.isSortRequired = isSortRequired;
-      this.isUseInvertedIndex = isUseInvertedIndex;
-    }
-
-    public BlockSortThread(byte[][] data, boolean compression, boolean isNoDictionary,
-        boolean isSortRequired, boolean isUseInvertedIndex) {
-      this.data = data;
-      this.isCompressionReq = compression;
-      this.isNoDictionary = isNoDictionary;
-      this.isSortRequired = isSortRequired;
-      this.isUseInvertedIndex = isUseInvertedIndex;
-    }
-
-    @Override public IndexStorage call() throws Exception {
-      if (index == 1) {
-        int dd = 1 + 1;
-      }
-      if (isUseInvertedIndex) {
-        if (version == ColumnarFormatVersion.V3) {
-          return new BlockIndexerStorageForShort(this.data, isCompressionReq, isNoDictionary,
-              isSortRequired);
-        } else {
-          return new BlockIndexerStorageForInt(this.data, isCompressionReq, isNoDictionary,
-              isSortRequired);
-        }
-      } else {
-        if (version == ColumnarFormatVersion.V3) {
-          return new BlockIndexerStorageForNoInvertedIndexForShort(this.data,isNoDictionary);
-        } else {
-          return new BlockIndexerStorageForNoInvertedIndex(this.data);
-        }
-      }
-
-    }
-
-  }
-
-  public class Codec {
-    private WriterCompressModel compressionModel;
-    private byte[][] encodedMeasureArray;
-    private DataType[] measureType;
-
-    Codec(DataType[] measureType) {
-      this.measureType = measureType;
-    }
-
-    public WriterCompressModel getCompressionModel() {
-      return compressionModel;
-    }
-
-    public byte[][] getEncodedMeasure() {
-      return encodedMeasureArray;
-    }
-
-    public Codec encodeAndCompressMeasures(TablePage tablePage) {
-      // TODO: following conversion is required only because compress model requires them,
-      // remove then after the compress framework is refactoried
-      FixLengthColumnPage[] measurePage = tablePage.getMeasurePage();
-      int measureCount = measurePage.length;
-      Object[] min = new Object[measurePage.length];
-      Object[] max = new Object[measurePage.length];
-      Object[] uniqueValue = new Object[measurePage.length];
-      int[] decimal = new int[measurePage.length];
-      for (int i = 0; i < measurePage.length; i++) {
-        min[i] = measurePage[i].getStatistics().getMin();
-        max[i] = measurePage[i].getStatistics().getMax();
-        uniqueValue[i] = measurePage[i].getStatistics().getUniqueValue();
-        decimal[i] = measurePage[i].getStatistics().getDecimal();
-      }
-      // encode and compress measure column page
-      compressionModel =
-          ValueCompressionUtil.getWriterCompressModel(max, min, decimal, uniqueValue, measureType,
-              new byte[measureCount]);
-      encodedMeasureArray = encodeMeasure(compressionModel, measurePage);
-      return this;
-    }
-
-    // this method first invokes encoding routine to encode the data chunk,
-    // followed by invoking compression routine for preparing the data chunk for writing.
-    private byte[][] encodeMeasure(WriterCompressModel compressionModel,
-        FixLengthColumnPage[] columnPages) {
-
-      CarbonWriteDataHolder[] holders = new CarbonWriteDataHolder[columnPages.length];
-      for (int i = 0; i < holders.length; i++) {
-        holders[i] = new CarbonWriteDataHolder();
-        switch (columnPages[i].getDataType()) {
-          case SHORT:
-          case INT:
-          case LONG:
-            holders[i].setWritableLongPage(columnPages[i].getLongPage());
-            break;
-          case DOUBLE:
-            holders[i].setWritableDoublePage(columnPages[i].getDoublePage());
-            break;
-          case DECIMAL:
-            holders[i].setWritableDecimalPage(columnPages[i].getDecimalPage());
-            break;
-          default:
-            throw new RuntimeException("Unsupported data type: " + columnPages[i].getDataType());
-        }
-      }
-
-      DataType[] dataType = compressionModel.getDataType();
-      ValueCompressionHolder[] values =
-          new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
-      byte[][] returnValue = new byte[values.length][];
-      for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
-        values[i] = compressionModel.getValueCompressionHolder()[i];
-        if (dataType[i] != DataType.DECIMAL) {
-          values[i].setValue(
-              ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
-                  .getCompressedValues(compressionModel.getCompressionFinders()[i], holders[i],
-                      compressionModel.getMaxValue()[i],
-                      compressionModel.getMantissa()[i]));
-        } else {
-          values[i].setValue(holders[i].getWritableByteArrayValues());
-        }
-        values[i].compress();
-        returnValue[i] = values[i].getCompressedData();
-      }
-
-      return returnValue;
-    }
-
-    /**
-     * Encode and compress each column page. The work is done using a thread pool.
-     */
-    private IndexStorage[] encodeAndCompressDimensions(TablePage tablePage) {
-      int noDictionaryCount = tablePage.getNoDictDimensionPage().length;
-      int complexColCount = tablePage.getComplexDimensionPage().length;
-
-      // thread pool size to be used for encoding dimension
-      // each thread will sort the column page data and compress it
-      int thread_pool_size = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.NUM_CORES_BLOCK_SORT,
-              CarbonCommonConstants.NUM_CORES_BLOCK_SORT_DEFAULT_VAL));
-      ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
-      Callable<IndexStorage> callable;
-      List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
-          model.getPrimitiveDimLens().length + noDictionaryCount + complexColCount);
-      int i = 0;
-      int dictionaryColumnCount = -1;
-      int noDictionaryColumnCount = -1;
-      int colGrpId = -1;
-      boolean isSortColumn = false;
-      SegmentProperties segmentProperties = model.getSegmentProperties();
-      for (i = 0; i < isDictDimension.length; i++) {
-        isSortColumn = i < segmentProperties.getNumberOfSortColumns();
-        if (isDictDimension[i]) {
-          dictionaryColumnCount++;
-          if (colGrpModel.isColumnar(dictionaryColumnCount)) {
-            // dictionary dimension
-            callable =
-                new BlockSortThread(
-                    tablePage.getKeyColumnPage().getKeyVector(dictionaryColumnCount),
-                    true,
-                    false,
-                    isSortColumn,
-                    isUseInvertedIndex[i] & isSortColumn);
-
-          } else {
-            // column group
-            callable = new ColGroupBlockStorage(
-                segmentProperties,
-                ++colGrpId,
-                tablePage.getKeyColumnPage().getKeyVector(dictionaryColumnCount));
-          }
-        } else {
-          // no dictionary dimension
-          callable =
-              new BlockSortThread(
-                  tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getByteArrayPage(),
-                  false,
-                  true,
-                  isSortColumn,
-                  isUseInvertedIndex[i] & isSortColumn);
-        }
-        // start a thread to sort the page data
-        submit.add(executorService.submit(callable));
-      }
-
-      // complex type column
-      for (int index = 0; index < getComplexColumnCount(); index++) {
-        Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[index].iterator();
-        while (iterator.hasNext()) {
-          byte[][] data = iterator.next();
-          callable =
-              new BlockSortThread(
-                  i++,
-                  data,
-                  false,
-                  true);
-          submit.add(executorService.submit(callable));
-        }
-      }
-      executorService.shutdown();
-      try {
-        executorService.awaitTermination(1, TimeUnit.DAYS);
-      } catch (InterruptedException e) {
-        LOGGER.error(e, e.getMessage());
-      }
-      IndexStorage[] dimColumns = new IndexStorage[
-          colGrpModel.getNoOfColumnStore() + noDictionaryCount + getExpandedComplexColsCount()];
-      try {
-        for (int k = 0; k < dimColumns.length; k++) {
-          dimColumns[k] = submit.get(k).get();
-        }
-      } catch (Exception e) {
-        LOGGER.error(e, e.getMessage());
-      }
-      return dimColumns;
-    }
-  }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index df27dcc..d400a6d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.GenericDataType;
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
@@ -37,7 +39,6 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
@@ -153,13 +154,14 @@ public class CarbonFactDataHandlerModel {
   // key generator for complex dimension
   private KeyGenerator[] complexDimensionKeyGenerator;
 
+  private TableSpec tableSpec;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
    */
   public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
       CarbonDataLoadConfiguration configuration, String storeLocation, int bucketId,
       int taskExtension) {
-
     CarbonTableIdentifier identifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     boolean[] isUseInvertedIndex =
@@ -247,6 +249,7 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.bucketId = bucketId;
     carbonFactDataHandlerModel.segmentId = configuration.getSegmentId();
     carbonFactDataHandlerModel.taskExtension = taskExtension;
+    carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec();
     return carbonFactDataHandlerModel;
   }
 
@@ -304,6 +307,10 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
     carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
+
+    carbonFactDataHandlerModel.tableSpec = new TableSpec(
+        segmentProperties.getDimensions(),
+        segmentProperties.getMeasures());
     return carbonFactDataHandlerModel;
   }
 
@@ -522,5 +529,26 @@ public class CarbonFactDataHandlerModel {
   public int getComplexColumnCount() {
     return complexIndexMap.size();
   }
+
+  // return the number of complex column after complex columns are expanded
+  public int getExpandedComplexColsCount() {
+    int count = 0;
+    int dictDimensionCount = getDimensionCount();
+    for (int i = 0; i < dictDimensionCount; i++) {
+      GenericDataType complexDataType = getComplexIndexMap().get(i);
+      if (complexDataType != null) {
+        count += complexDataType.getColsCount();
+      }
+    }
+    return count;
+  }
+
+  public boolean isSortColumn(int columnIndex) {
+    return columnIndex < segmentProperties.getNumberOfSortColumns();
+  }
+
+  public TableSpec getTableSpec() {
+    return tableSpec;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
index 16d2da0..260cfc7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
@@ -17,8 +17,8 @@
 
 package org.apache.carbondata.processing.store;
 
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
 
 public interface CarbonFactHandler {
   void initialise() throws CarbonDataWriterException;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
new file mode 100644
index 0000000..73c4fa1
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import org.apache.carbondata.core.compression.ValueCompressor;
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.ColGroupBlockStorage;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatistics;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CompressionFinder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.processing.store.writer.Encoder;
+
+// Default encoder for encoding dimension and measures. For dimensions, it applies RLE and
+// inverted index encoding. For measures, it applies delta encoding or adaptive encoding
+public class DefaultEncoder implements Encoder {
+
+  private ColumnarFormatVersion version;
+
+  private boolean[] isUseInvertedIndex;
+
+  private CarbonFactDataHandlerModel model;
+
+  public DefaultEncoder(CarbonFactDataHandlerModel model) {
+    this.version = CarbonProperties.getInstance().getFormatVersion();
+    this.model = model;
+    this.isUseInvertedIndex = model.getIsUseInvertedIndex();
+  }
+
+  // function to encode all columns in one table page
+  public Encoder.EncodedData encode(TablePage tablePage) {
+    Encoder.EncodedData encodedData = new Encoder.EncodedData();
+    encodeAndCompressDimensions(tablePage, encodedData);
+    encodeAndCompressMeasures(tablePage, encodedData);
+    return encodedData;
+  }
+
+  // encode measure and set encodedData in `encodedData`
+  private void encodeAndCompressMeasures(TablePage tablePage, Encoder.EncodedData encodedData) {
+    // TODO: following conversion is required only because compress model requires them,
+    // remove then after the compress framework is refactoried
+    ColumnPage[] measurePage = tablePage.getMeasurePage();
+    int measureCount = measurePage.length;
+    byte[] dataTypeSelected = new byte[measureCount];
+    CompressionFinder[] finders = new CompressionFinder[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      ColumnPageStatistics stats = measurePage[i].getStatistics();
+      finders[i] = ValueCompressionUtil.getCompressionFinder(
+          stats.getMax(),
+          stats.getMin(),
+          stats.getDecimal(),
+          measurePage[i].getDataType(), dataTypeSelected[i]);
+    }
+
+    //CompressionFinder[] finders = compressionModel.getCompressionFinders();
+    ValueCompressionHolder[] holders = ValueCompressionUtil.getValueCompressionHolder(finders);
+    encodedData.measures = encodeMeasure(holders, finders, measurePage);
+  }
+
+  // this method first invokes encoding routine to encode the data chunk,
+  // followed by invoking compression routine for preparing the data chunk for writing.
+  private byte[][] encodeMeasure(ValueCompressionHolder[] holders,
+      CompressionFinder[] finders,
+      ColumnPage[] columnPages) {
+    ValueCompressionHolder[] values = new ValueCompressionHolder[columnPages.length];
+    byte[][] encodedMeasures = new byte[values.length][];
+    for (int i = 0; i < columnPages.length; i++) {
+      values[i] = holders[i];
+      if (columnPages[i].getDataType() != DataType.DECIMAL) {
+        ValueCompressor compressor =
+            ValueCompressionUtil.getValueCompressor(finders[i]);
+        Object compressed = compressor.getCompressedValues(
+            finders[i],
+            columnPages[i],
+            columnPages[i].getStatistics().getMax(),
+            columnPages[i].getStatistics().getDecimal());
+        values[i].setValue(compressed);
+      } else {
+        // in case of decimal, 'flatten' the byte[][] to byte[]
+        byte[][] decimalPage = columnPages[i].getDecimalPage();
+        int totalSize = 0;
+        for (byte[] decimal : decimalPage) {
+          totalSize += decimal.length;
+        }
+        ByteBuffer temp = ByteBuffer.allocate(totalSize);
+        for (byte[] decimal : decimalPage) {
+          temp.put(decimal);
+        }
+        values[i].setValue(temp.array());
+      }
+      values[i].compress();
+      encodedMeasures[i] = values[i].getCompressedData();
+    }
+
+    return encodedMeasures;
+  }
+
+  private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
+      boolean isUseInvertedIndex) {
+    if (isUseInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForShort(data, true, false, isSort);
+      } else {
+        return new BlockIndexerStorageForInt(data, true, false, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+      } else {
+        return new BlockIndexerStorageForNoInvertedIndex(data);
+      }
+    }
+  }
+
+  private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
+      boolean isUseInvertedIndex) {
+    if (isUseInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForShort(data, false, false, isSort);
+      } else {
+        return new BlockIndexerStorageForInt(data, false, false, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+      } else {
+        return new BlockIndexerStorageForNoInvertedIndex(data);
+      }
+    }
+  }
+
+  private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
+    if (version == ColumnarFormatVersion.V3) {
+      return new BlockIndexerStorageForShort(data, false, false, false);
+    } else {
+      return new BlockIndexerStorageForInt(data, false, false, false);
+    }
+  }
+
+  private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
+      boolean isUseInvertedIndex) {
+    if (isUseInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForShort(data, false, true, isSort);
+      } else {
+        return new BlockIndexerStorageForInt(data, false, true, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
+      } else {
+        return new BlockIndexerStorageForNoInvertedIndex(data);
+      }
+    }
+  }
+
+  // encode and compress each dimension, set encoded data in `encodedData`
+  private void encodeAndCompressDimensions(TablePage tablePage, Encoder.EncodedData encodedData) {
+    TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
+    int dictionaryColumnCount = -1;
+    int noDictionaryColumnCount = -1;
+    int colGrpId = -1;
+    int indexStorageOffset = 0;
+    IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
+    SegmentProperties segmentProperties = model.getSegmentProperties();
+    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    byte[][] compressedColumns = new byte[indexStorages.length][];
+    for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
+      byte[] flattened;
+      boolean isSortColumn = model.isSortColumn(i);
+      switch (dimensionSpec.getType(i)) {
+        case GLOBAL_DICTIONARY:
+          // dictionary dimension
+          indexStorages[indexStorageOffset] =
+              encodeAndCompressDictDimension(
+                  tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount),
+                  isSortColumn,
+                  isUseInvertedIndex[i] & isSortColumn);
+          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+          break;
+        case DIRECT_DICTIONARY:
+          // timestamp and date column
+          indexStorages[indexStorageOffset] =
+              encodeAndCompressDirectDictDimension(
+                  tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount),
+                  isSortColumn,
+                  isUseInvertedIndex[i] & isSortColumn);
+          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+          break;
+        case PLAIN_VALUE:
+          // high cardinality dimension, encoded as plain string
+          indexStorages[indexStorageOffset] =
+              encodeAndCompressNoDictDimension(
+                  tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getStringPage(),
+                  isSortColumn,
+                  isUseInvertedIndex[i] & isSortColumn);
+          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+          break;
+        case COLUMN_GROUP:
+          // column group
+          indexStorages[indexStorageOffset] =
+              new ColGroupBlockStorage(
+                  segmentProperties,
+                  ++colGrpId,
+                  tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount));
+          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+          break;
+        case COMPLEX:
+          // we need to add complex column at last, so skipping it here
+          continue;
+        default:
+          throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
+      }
+      compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
+      indexStorageOffset++;
+    }
+
+    // handle complex type column
+    for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
+      Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[i].iterator();
+      while (iterator.hasNext()) {
+        byte[][] data = iterator.next();
+        indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
+        byte[] flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+        compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
+        indexStorageOffset++;
+      }
+    }
+
+    encodedData.indexStorages = indexStorages;
+    encodedData.dimensions = compressedColumns;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
index 6a33f34..9b81979 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java
@@ -30,11 +30,11 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortTempFileChunkHolder;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 0c02980..fb7ebfb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -25,17 +25,17 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.GenericDataType;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.datastore.page.FixLengthColumnPage;
 import org.apache.carbondata.core.datastore.page.KeyColumnPage;
-import org.apache.carbondata.core.datastore.page.VarLengthColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 import org.apache.spark.sql.types.Decimal;
 
@@ -43,7 +43,7 @@ import org.apache.spark.sql.types.Decimal;
  * Represent a page data for all columns, we store its data in columnar layout, so that
  * all processing apply to TablePage can be done in vectorized fashion.
  */
-class TablePage {
+public class TablePage {
 
   // For all dimension and measure columns, we store the column data directly in the page,
   // the length of the page is the number of rows.
@@ -51,9 +51,11 @@ class TablePage {
   // TODO: we should have separate class for key columns so that keys are stored together in
   // one vector to make it efficient for sorting
   private KeyColumnPage keyColumnPage;
-  private VarLengthColumnPage[] noDictDimensionPage;
+  private ColumnPage[] noDictDimensionPage;
   private ComplexColumnPage[] complexDimensionPage;
-  private FixLengthColumnPage[] measurePage;
+  private ColumnPage[] measurePage;
+
+  private MeasurePageStatsVO measurePageStatistics;
 
   // the num of rows in this page, it must be less than short value (65536)
   private int pageSize;
@@ -65,9 +67,9 @@ class TablePage {
     this.pageSize = pageSize;
     keyColumnPage = new KeyColumnPage(pageSize,
         model.getSegmentProperties().getDimensionPartitions().length);
-    noDictDimensionPage = new VarLengthColumnPage[model.getNoDictionaryCount()];
+    noDictDimensionPage = new ColumnPage[model.getNoDictionaryCount()];
     for (int i = 0; i < noDictDimensionPage.length; i++) {
-      noDictDimensionPage[i] = new VarLengthColumnPage(pageSize);
+      noDictDimensionPage[i] = new ColumnPage(DataType.STRING, pageSize);
     }
     complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()];
     for (int i = 0; i < complexDimensionPage.length; i++) {
@@ -75,10 +77,10 @@ class TablePage {
       // we get the first row.
       complexDimensionPage[i] = null;
     }
-    measurePage = new FixLengthColumnPage[model.getMeasureCount()];
+    measurePage = new ColumnPage[model.getMeasureCount()];
     DataType[] dataTypes = model.getMeasureDataType();
     for (int i = 0; i < measurePage.length; i++) {
-      measurePage[i] = new FixLengthColumnPage(dataTypes[i], pageSize);
+      measurePage[i] = new ColumnPage(dataTypes[i], pageSize);
     }
   }
 
@@ -104,9 +106,9 @@ class TablePage {
       for (int i = 0; i < noDictAndComplex.length; i++) {
         if (i < noDictionaryCount) {
           // noDictionary columns, since it is variable length, we need to prepare each
-          // element as LV encoded byte array (first two bytes are the length of the array)
+          // element as LV result byte array (first two bytes are the length of the array)
           byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
-          noDictDimensionPage[i].putByteArray(rowId, valueWithLength);
+          noDictDimensionPage[i].putData(rowId, valueWithLength);
         } else {
           // complex columns
           addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
@@ -121,13 +123,19 @@ class TablePage {
 
       // in compaction flow the measure with decimal type will come as Spark decimal.
       // need to convert it to byte array.
-      if (null != value && measurePage[i].getDataType() == DataType.DECIMAL && model
-          .isCompactionFlow()) {
+      if (measurePage[i].getDataType() == DataType.DECIMAL &&
+          model.isCompactionFlow() &&
+          value != null) {
         BigDecimal bigDecimal = ((Decimal) value).toJavaBigDecimal();
         value = DataTypeUtil.bigDecimalToByte(bigDecimal);
       }
       measurePage[i].putData(rowId, value);
     }
+
+    // update statistics if it is last row
+    if (rowId + 1 == pageSize) {
+      this.measurePageStatistics = new MeasurePageStatsVO(measurePage);
+    }
   }
 
   /**
@@ -150,7 +158,7 @@ class TablePage {
     }
 
     int depthInComplexColumn = getComplexDimensionPage()[index].getDepth();
-    // this is the encoded columnar data which will be added to page,
+    // this is the result columnar data which will be added to page,
     // size of this list is the depth of complex column, we will fill it by input data
     List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
     for (int k = 0; k < depthInComplexColumn; k++) {
@@ -191,7 +199,7 @@ class TablePage {
     return keyColumnPage;
   }
 
-  public VarLengthColumnPage[] getNoDictDimensionPage() {
+  public ColumnPage[] getNoDictDimensionPage() {
     return noDictDimensionPage;
   }
 
@@ -199,7 +207,17 @@ class TablePage {
     return complexDimensionPage;
   }
 
-  public FixLengthColumnPage[] getMeasurePage() {
+  public ColumnPage[] getMeasurePage() {
     return measurePage;
   }
+
+  public MeasurePageStatsVO getMeasureStats() {
+    return measurePageStatistics;
+  }
+
+  public int getPageSize() {
+    return pageSize;
+  }
 }
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
new file mode 100644
index 0000000..3cb4777
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.processing.util.NonDictionaryUtil;
+
+public class TablePageKey {
+  private int pageSize;
+
+  private byte[][] currentNoDictionaryKey;
+
+  // MDK start key
+  private byte[] startKey;
+
+  // MDK end key
+  private byte[] endKey;
+
+  // startkey for no dictionary columns
+  private byte[][] noDictStartKey;
+
+  // endkey for no diciotn
+  private byte[][] noDictEndKey;
+
+  // startkey for no dictionary columns after packing into one column
+  private byte[] packedNoDictStartKey;
+
+  // endkey for no dictionary columns after packing into one column
+  private byte[] packedNoDictEndKey;
+
+  private CarbonFactDataHandlerModel model;
+
+  TablePageKey(CarbonFactDataHandlerModel model, int pageSize) {
+    this.model = model;
+    this.pageSize = pageSize;
+  }
+
+  /** update all keys based on the input row */
+  void update(int rowId, CarbonRow row) throws KeyGenException {
+    byte[] currentMDKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
+    if (model.getNoDictionaryCount() > 0 || model.getComplexIndexMap().size() > 0) {
+      currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row);
+    }
+    if (rowId == 0) {
+      startKey = currentMDKey;
+      noDictStartKey = currentNoDictionaryKey;
+    }
+    endKey = currentMDKey;
+    noDictEndKey = currentNoDictionaryKey;
+    if (rowId == pageSize - 1) {
+      finalizeKeys();
+    }
+  }
+
+  /** update all keys if SORT_COLUMNS option is used when creating table */
+  private void finalizeKeys() {
+    // If SORT_COLUMNS is used, may need to update start/end keys since the they may
+    // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
+    // start/end key
+    int numberOfDictSortColumns = model.getSegmentProperties().getNumberOfDictSortColumns();
+    if (numberOfDictSortColumns > 0) {
+      // if SORT_COLUMNS contain dictionary columns
+      int[] keySize = model.getSegmentProperties().getFixedLengthKeySplitter().getBlockKeySize();
+      if (keySize.length > numberOfDictSortColumns) {
+        // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here
+        int newMdkLength = 0;
+        for (int i = 0; i < numberOfDictSortColumns; i++) {
+          newMdkLength += keySize[i];
+        }
+        byte[] newStartKeyOfSortKey = new byte[newMdkLength];
+        byte[] newEndKeyOfSortKey = new byte[newMdkLength];
+        System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength);
+        System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength);
+        startKey = newStartKeyOfSortKey;
+        endKey = newEndKeyOfSortKey;
+      }
+    } else {
+      startKey = new byte[0];
+      endKey = new byte[0];
+    }
+
+    // Do the same update for noDictionary start/end Key
+    int numberOfNoDictSortColumns = model.getSegmentProperties().getNumberOfNoDictSortColumns();
+    if (numberOfNoDictSortColumns > 0) {
+      // if sort_columns contain no-dictionary columns
+      if (noDictStartKey.length > numberOfNoDictSortColumns) {
+        byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
+        byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
+        System.arraycopy(
+            noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns);
+        System.arraycopy(
+            noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
+        noDictStartKey = newNoDictionaryStartKey;
+        noDictEndKey = newNoDictionaryEndKey;
+      }
+      packedNoDictStartKey =
+          NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey);
+      packedNoDictEndKey =
+          NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey);
+    }
+  }
+
+  public byte[] getStartKey() {
+    return startKey;
+  }
+
+  public byte[] getEndKey() {
+    return endKey;
+  }
+
+  public byte[] getNoDictStartKey() {
+    return packedNoDictStartKey;
+  }
+
+  public byte[] getNoDictEndKey() {
+    return packedNoDictEndKey;
+  }
+
+  public int getPageSize() {
+    return pageSize;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
new file mode 100644
index 0000000..2911936
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatistics;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.processing.store.writer.Encoder;
+
+// Statistics of dimension and measure column in a TablePage
+public class TablePageStatistics {
+
+  // number of dimension after complex column expanded
+  private int numDimensionsExpanded;
+
+  // min of each dimension column
+  private byte[][] dimensionMinValue;
+
+  // max of each dimension column
+  private byte[][] dimensionMaxValue;
+
+  // min of each measure column
+  private byte[][] measureMinValue;
+
+  // max os each measure column
+  private byte[][] measureMaxValue;
+
+  // null bit set for each measure column
+  private BitSet[] nullBitSet;
+
+  // measure stats
+  // TODO: there are redundant stats
+  private MeasurePageStatsVO measurePageStatistics;
+
+  private TableSpec tableSpec;
+
+  TablePageStatistics(TableSpec tableSpec, TablePage tablePage,
+      Encoder.EncodedData encodedData, MeasurePageStatsVO measurePageStatistics) {
+    this.numDimensionsExpanded = tableSpec.getDimensionSpec().getNumExpandedDimensions();
+    int numMeasures = tableSpec.getMeasureSpec().getNumMeasures();
+    this.dimensionMinValue = new byte[numDimensionsExpanded][];
+    this.dimensionMaxValue = new byte[numDimensionsExpanded][];
+    this.measureMinValue = new byte[numMeasures][];
+    this.measureMaxValue = new byte[numMeasures][];
+    this.nullBitSet = new BitSet[numMeasures];
+    this.tableSpec = tableSpec;
+    this.measurePageStatistics = measurePageStatistics;
+    updateMinMax(tablePage, encodedData);
+    updateNullBitSet(tablePage);
+  }
+
+  private void updateMinMax(TablePage tablePage, Encoder.EncodedData encodedData) {
+    IndexStorage[] keyStorageArray = encodedData.indexStorages;
+    byte[][] measureArray = encodedData.measures;
+
+    for (int i = 0; i < numDimensionsExpanded; i++) {
+      switch (tableSpec.getDimensionSpec().getType(i)) {
+        case GLOBAL_DICTIONARY:
+        case DIRECT_DICTIONARY:
+        case COLUMN_GROUP:
+        case COMPLEX:
+          dimensionMinValue[i] = keyStorageArray[i].getMin();
+          dimensionMaxValue[i] = keyStorageArray[i].getMax();
+          break;
+        case PLAIN_VALUE:
+          dimensionMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
+          dimensionMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
+          break;
+      }
+    }
+    for (int i = 0; i < measureArray.length; i++) {
+      ColumnPageStatistics stats = tablePage.getMeasurePage()[i].getStatistics();
+      measureMaxValue[i] = stats.minBytes();
+      measureMinValue[i] = stats.maxBytes();
+    }
+  }
+
+  private void updateNullBitSet(TablePage tablePage) {
+    nullBitSet = new BitSet[tablePage.getMeasurePage().length];
+    ColumnPage[] measurePages = tablePage.getMeasurePage();
+    for (int i = 0; i < nullBitSet.length; i++) {
+      nullBitSet[i] = measurePages[i].getNullBitSet();
+    }
+  }
+
+  /**
+   * Below method will be used to update the min or max value
+   * by removing the length from it
+   *
+   * @return min max value without length
+   */
+  private byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
+    ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
+    byte[] actualValue = new byte[buffer.getShort()];
+    buffer.get(actualValue);
+    return actualValue;
+  }
+
+  public byte[][] getDimensionMinValue() {
+    return dimensionMinValue;
+  }
+
+  public byte[][] getDimensionMaxValue() {
+    return dimensionMaxValue;
+  }
+
+  public byte[][] getMeasureMinValue() {
+    return measureMinValue;
+  }
+
+  public byte[][] getMeasureMaxValue() {
+    return measureMaxValue;
+  }
+
+  public BitSet[] getNullBitSet() {
+    return nullBitSet;
+  }
+
+  public MeasurePageStatsVO getMeasurePageStatistics() {
+    return measurePageStatistics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
deleted file mode 100644
index 8fa8432..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.colgroup;
-
-import java.util.concurrent.Callable;
-
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-
-/**
- * it is holder of column group data and also min max for colgroup block data
- */
-public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage> {
-
-  private byte[][] data;
-
-  private ColGroupMinMax colGrpMinMax;
-
-  public ColGroupBlockStorage(SegmentProperties segmentProperties, int colGrpIndex, byte[][] data) {
-    colGrpMinMax = new ColGroupMinMax(segmentProperties, colGrpIndex);
-    this.data = data;
-    for (int i = 0; i < data.length; i++) {
-      colGrpMinMax.add(data[i]);
-    }
-  }
-
-  /**
-   * sorting is not required for colgroup storage and hence return true
-   */
-  @Override public boolean isAlreadySorted() {
-    return true;
-  }
-
-  /**
-   * for column group storage its not required
-   */
-  @Override public ColGroupDataHolder getDataAfterComp() {
-    //not required for column group storage
-    return null;
-  }
-
-  /**
-   * for column group storage its not required
-   */
-  @Override public ColGroupDataHolder getIndexMap() {
-    // not required for column group storage
-    return null;
-  }
-
-  /**
-   * for column group storage its not required
-   */
-  @Override public byte[][] getKeyBlock() {
-    return data;
-  }
-
-  /**
-   * for column group storage its not required
-   */
-  @Override public ColGroupDataHolder getDataIndexMap() {
-    //not required for column group
-    return null;
-  }
-
-  /**
-   * for column group storage its not required
-   */
-  @Override public int getTotalSize() {
-    return data.length;
-  }
-
-  @Override public byte[] getMin() {
-    return colGrpMinMax.getMin();
-  }
-
-  @Override public byte[] getMax() {
-    return colGrpMinMax.getMax();
-  }
-
-  /**
-   * return self
-   */
-  @Override public IndexStorage call() throws Exception {
-    return this;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 0fc1d64..aaeaf66 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -39,8 +39,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
@@ -67,7 +66,6 @@ import org.apache.carbondata.format.BlockIndex;
 import org.apache.carbondata.format.BlockletInfo3;
 import org.apache.carbondata.format.IndexHeader;
 import org.apache.carbondata.processing.store.file.FileData;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.io.IOUtils;
@@ -575,59 +573,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    */
   public abstract void writeBlockletData(NodeHolder nodeHolder) throws CarbonDataWriterException;
 
-  protected byte[][] fillAndCompressedKeyBlockData(IndexStorage[] keyStorageArray,
-      int entryCount) {
-    byte[][] keyBlockData = new byte[keyStorageArray.length][];
-    int destPos = 0;
-    int keyBlockSizePosition = -1;
-    for (int i = 0; i < keyStorageArray.length; i++) {
-      destPos = 0;
-      //handling for high card dims
-      if (!dataWriterVo.getIsComplexType()[i] && !dataWriterVo.getIsDictionaryColumn()[i]) {
-        int totalLength = 0;
-        // calc size of the total bytes in all the colmns.
-        for (int k = 0; k < keyStorageArray[i].getKeyBlock().length; k++) {
-          byte[] colValue = keyStorageArray[i].getKeyBlock()[k];
-          totalLength += colValue.length;
-        }
-        keyBlockData[i] = new byte[totalLength];
-
-        for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
-          int length = keyStorageArray[i].getKeyBlock()[j].length;
-          System
-              .arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos, length);
-          destPos += length;
-        }
-      } else {
-        keyBlockSizePosition++;
-        if (dataWriterVo.getAggBlocks()[i]) {
-          keyBlockData[i] = new byte[keyStorageArray[i].getTotalSize()];
-          for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
-            System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos,
-                keyStorageArray[i].getKeyBlock()[j].length);
-            destPos += keyStorageArray[i].getKeyBlock()[j].length;
-          }
-        } else {
-          if (dataWriterVo.getIsComplexType()[i]) {
-            keyBlockData[i] = new byte[keyStorageArray[i].getKeyBlock().length * dataWriterVo
-                .getKeyBlockSize()[keyBlockSizePosition]];
-          } else {
-            keyBlockData[i] =
-                new byte[entryCount * dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]];
-          }
-          for (int j = 0; j < keyStorageArray[i].getKeyBlock().length; j++) {
-            System.arraycopy(keyStorageArray[i].getKeyBlock()[j], 0, keyBlockData[i], destPos,
-                dataWriterVo.getKeyBlockSize()[keyBlockSizePosition]);
-            destPos += dataWriterVo.getKeyBlockSize()[keyBlockSizePosition];
-          }
-        }
-      }
-      keyBlockData[i] = CompressorFactory.getInstance().getCompressor()
-          .compressByte(keyBlockData[i]);
-    }
-    return keyBlockData;
-  }
-
   /**
    * Below method will be used to update the min or max value
    * by removing the length from it

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index 8fbf9c0..defa23a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -36,9 +36,7 @@ public class CarbonDataWriterVo {
 
   private IFileManagerComposite fileManager;
 
-  private int[] keyBlockSize;
-
-  private boolean[] aggBlocks;
+  private boolean[] rleEncodingForDictDim;
 
   private boolean[] isComplexType;
 
@@ -123,31 +121,17 @@ public class CarbonDataWriterVo {
   }
 
   /**
-   * @return the keyBlockSize
-   */
-  public int[] getKeyBlockSize() {
-    return keyBlockSize;
-  }
-
-  /**
-   * @param keyBlockSize the keyBlockSize to set
-   */
-  public void setKeyBlockSize(int[] keyBlockSize) {
-    this.keyBlockSize = keyBlockSize;
-  }
-
-  /**
-   * @return the aggBlocks
+   * @return the rleEncodingForDictDim
    */
-  public boolean[] getAggBlocks() {
-    return aggBlocks;
+  public boolean[] getRleEncodingForDictDim() {
+    return rleEncodingForDictDim;
   }
 
   /**
-   * @param aggBlocks the aggBlocks to set
+   * @param rleEncodingForDictDim the rleEncodingForDictDim to set
    */
-  public void setAggBlocks(boolean[] aggBlocks) {
-    this.aggBlocks = aggBlocks;
+  public void setRleEncodingForDictDim(boolean[] rleEncodingForDictDim) {
+    this.rleEncodingForDictDim = rleEncodingForDictDim;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index c8f740b..8ee08c4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -17,33 +17,19 @@
 
 package org.apache.carbondata.processing.store.writer;
 
-import java.util.BitSet;
-
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.util.NodeHolder;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.store.TablePageKey;
+import org.apache.carbondata.processing.store.TablePageStatistics;
 
 public interface CarbonFactDataWriter<T> {
 
   /**
-   * This method will be used to write leaf data to file
-   * file format
-   * <key><measure1><measure2>....
-   *
-   * @param measureArray            measure array
-   * @param entryCount           number of entries
-   * @param startKey             start key of leaf
-   * @param endKey               end key of leaf
-   * @param noDictionaryEndKey
-   * @param noDictionaryStartKey
-   * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
+   * This method will be used to create NodeHolder for a table page
    */
 
-  NodeHolder buildDataNodeHolder(IndexStorage<T>[] keyStorageArray, byte[][] measureArray,
-      int entryCount, byte[] startKey, byte[] endKey, WriterCompressModel compressionModel,
-      byte[] noDictionaryStartKey, byte[] noDictionaryEndKey, BitSet[] nullValueIndexBitSet)
-      throws CarbonDataWriterException;
+  NodeHolder buildDataNodeHolder(Encoder.EncodedData encoded, TablePageStatistics stats,
+      TablePageKey key) throws CarbonDataWriterException;
 
   /**
    * If node holder flag is enabled the object will be added to list

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/writer/Encoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/Encoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/Encoder.java
new file mode 100644
index 0000000..c2d0214
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/Encoder.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store.writer;
+
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.processing.store.TablePage;
+
+public interface Encoder {
+
+  EncodedData encode(TablePage tablePage);
+
+  // result result of all columns
+  class EncodedData {
+    // dimension data that include rowid (index)
+    public IndexStorage[] indexStorages;
+
+    // encoded and compressed dimension data
+    public byte[][] dimensions;
+
+    // encoded and compressed measure data
+    public byte[][] measures;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
deleted file mode 100644
index 9ac3481..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/exception/CarbonDataWriterException.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store.writer.exception;
-
-import java.util.Locale;
-
-public class CarbonDataWriterException extends RuntimeException {
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public CarbonDataWriterException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public CarbonDataWriterException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-
-}