You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/05/27 12:50:20 UTC

[2/4] carbondata git commit: add TablePage

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
index 1f48324..e4b8a46 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
@@ -375,9 +375,9 @@ public class SortParameters {
     parameters.setTaskNo(configuration.getTaskNo());
     parameters.setMeasureColCount(configuration.getMeasureCount());
     parameters.setDimColCount(
-        configuration.getDimensionCount() - configuration.getComplexDimensionCount());
+        configuration.getDimensionCount() - configuration.getComplexColumnCount());
     parameters.setNoDictionaryCount(configuration.getNoDictionaryCount());
-    parameters.setComplexDimColCount(configuration.getComplexDimensionCount());
+    parameters.setComplexDimColCount(configuration.getComplexColumnCount());
     parameters.setNoDictionaryDimnesionColumn(
         CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
     parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
@@ -466,8 +466,7 @@ public class SortParameters {
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE,
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
 
-    DataType[] measureDataType = CarbonDataProcessorUtil
-        .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields());
+    DataType[] measureDataType = configuration.getMeasureDataType();
     parameters.setMeasureDataType(measureDataType);
     return parameters;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 9a7450a..f165dcc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -17,17 +17,11 @@
 
 package org.apache.carbondata.processing.store;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -52,25 +46,21 @@ import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
 import org.apache.carbondata.core.datastore.page.FixLengthColumnPage;
-import org.apache.carbondata.core.datastore.page.VarLengthColumnPage;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
 import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator;
-import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
 import org.apache.carbondata.processing.store.colgroup.ColGroupBlockStorage;
 import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
@@ -79,8 +69,6 @@ import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
-import org.apache.spark.sql.types.Decimal;
-
 /**
  * Fact data handler class to handle the fact data
  */
@@ -91,6 +79,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(CarbonFactDataHandlerColumnar.class.getName());
+
+  private CarbonFactDataHandlerModel model;
+
   /**
    * data writer
    */
@@ -103,49 +94,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * total number of entries in blocklet
    */
   private int entryCount;
-  private Map<Integer, GenericDataType> complexIndexMap;
-  /**
-   * measure count
-   */
-  private int measureCount;
-  /**
-   * measure count
-   */
-  private int dimensionCount;
-  /**
-   * index of mdkey in incoming rows
-   */
-  private int mdKeyIndex;
+
   /**
    * blocklet size (for V1 and V2) or page size (for V3). A Producer thread will start to process
    * once this size of input is reached
    */
   private int blockletSize;
   /**
-   * mdkeyLength
-   */
-  private int mdkeyLength;
-  /**
-   * storeLocation
-   */
-  private String storeLocation;
-  /**
-   * databaseName
-   */
-  private String databaseName;
-  /**
-   * tableName
-   */
-  private String tableName;
-  /**
-   * table block size in MB
-   */
-  private int tableBlockSize;
-  /**
-   * dimLens
-   */
-  private int[] dimLens;
-  /**
    * keyGenerator
    */
   private ColumnarSplitter columnarSplitter;
@@ -156,23 +111,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private boolean[] aggKeyBlock;
   private boolean[] isNoDictionary;
   private long processedDataCount;
-  private KeyGenerator[] complexKeyGenerator;
   private ExecutorService producerExecutorService;
   private List<Future<Void>> producerExecutorServiceTaskList;
   private ExecutorService consumerExecutorService;
   private List<Future<Void>> consumerExecutorServiceTaskList;
-  private List<Object[]> dataRows;
-  private int noDictionaryCount;
+  private List<CarbonRow> dataRows;
   private ColumnGroupModel colGrpModel;
-  private int[] primitiveDimLens;
-  private DataType[] type;
-  private int[] completeDimLens;
   private boolean[] isUseInvertedIndex;
   /**
-   * data file attributes which will used for file construction
-   */
-  private CarbonDataFileAttributes carbonDataFileAttributes;
-  /**
    * semaphore which will used for managing node holder objects
    */
   private Semaphore semaphore;
@@ -197,19 +143,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * flag to check whether all blocklets have been finished writing
    */
   private boolean processingComplete;
-  /**
-   * data directory location in carbon store path
-   */
-  private String carbonDataDirectoryPath;
-  /**
-   * no of complex dimensions
-   */
-  private int complexColCount;
-
-  /**
-   * column schema present in the table
-   */
-  private List<ColumnSchema> wrapperColumnSchemaList;
 
   /**
    * boolean to check whether dimension
@@ -217,24 +150,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private boolean[] isDictDimension;
 
-  /**
-   * colCardinality for the merge case.
-   */
-  private int[] colCardinality;
-
-  /**
-   * Segment properties
-   */
-  private SegmentProperties segmentProperties;
-  /**
-   * flag to check for compaction flow
-   */
-  private boolean compactionFlow;
-
   private int bucketNumber;
 
-  private long schemaUpdatedTimeStamp;
-
   private int taskExtension;
 
   /**
@@ -245,24 +162,21 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   /**
    * CarbonFactDataHandler constructor
    */
-  public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
-    initParameters(carbonFactDataHandlerModel);
-    this.dimensionCount = carbonFactDataHandlerModel.getDimensionCount();
-    this.complexIndexMap = carbonFactDataHandlerModel.getComplexIndexMap();
-    this.primitiveDimLens = carbonFactDataHandlerModel.getPrimitiveDimLens();
-    this.carbonDataDirectoryPath = carbonFactDataHandlerModel.getCarbonDataDirectoryPath();
-    this.complexColCount = getExpandedComplexColsCount();
-
-    int numDimColumns = colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexColCount;
+  public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel model) {
+    this.model = model;
+    initParameters(model);
+
+    int numDimColumns = colGrpModel.getNoOfColumnStore() + model.getNoDictionaryCount()
+        + getExpandedComplexColsCount();
     this.aggKeyBlock = new boolean[numDimColumns];
     this.isNoDictionary = new boolean[numDimColumns];
-    this.bucketNumber = carbonFactDataHandlerModel.getBucketId();
-    this.taskExtension = carbonFactDataHandlerModel.getTaskExtension();
+    this.bucketNumber = model.getBucketId();
+    this.taskExtension = model.getTaskExtension();
     this.isUseInvertedIndex = new boolean[numDimColumns];
-    if (null != carbonFactDataHandlerModel.getIsUseInvertedIndex()) {
+    if (null != model.getIsUseInvertedIndex()) {
       for (int i = 0; i < isUseInvertedIndex.length; i++) {
-        if (i < carbonFactDataHandlerModel.getIsUseInvertedIndex().length) {
-          isUseInvertedIndex[i] = carbonFactDataHandlerModel.getIsUseInvertedIndex()[i];
+        if (i < model.getIsUseInvertedIndex().length) {
+          isUseInvertedIndex[i] = model.getIsUseInvertedIndex()[i];
         } else {
           isUseInvertedIndex[i] = true;
         }
@@ -270,20 +184,23 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
     int noDictStartIndex = this.colGrpModel.getNoOfColumnStore();
     // setting true value for dims of high card
-    for (int i = 0; i < noDictionaryCount; i++) {
+    for (int i = 0; i < model.getNoDictionaryCount(); i++) {
       this.isNoDictionary[noDictStartIndex + i] = true;
     }
 
-    boolean isAggKeyBlock = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
+    boolean isAggKeyBlock = Boolean.parseBoolean(
+        CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK,
             CarbonCommonConstants.AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE));
     if (isAggKeyBlock) {
-      int noDictionaryValue = Integer.parseInt(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.HIGH_CARDINALITY_VALUE,
+      int noDictionaryValue = Integer.parseInt(
+          CarbonProperties.getInstance().getProperty(
+              CarbonCommonConstants.HIGH_CARDINALITY_VALUE,
               CarbonCommonConstants.HIGH_CARDINALITY_VALUE_DEFAULTVALUE));
       int[] columnSplits = colGrpModel.getColumnSplit();
       int dimCardinalityIndex = 0;
       int aggIndex = 0;
+      int[] dimLens = model.getSegmentProperties().getDimColumnsCardinality();
       for (int i = 0; i < columnSplits.length; i++) {
         if (colGrpModel.isColumnar(i) && dimLens[dimCardinalityIndex] < noDictionaryValue) {
           this.aggKeyBlock[aggIndex++] = true;
@@ -293,11 +210,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         aggIndex++;
       }
 
-      if (dimensionCount < dimLens.length) {
-        int allColsCount = getColsCount(dimensionCount);
+      if (model.getDimensionCount() < dimLens.length) {
+        int allColsCount = getColsCount(model.getDimensionCount());
         List<Boolean> aggKeyBlockWithComplex = new ArrayList<Boolean>(allColsCount);
-        for (int i = 0; i < dimensionCount; i++) {
-          GenericDataType complexDataType = complexIndexMap.get(i);
+        for (int i = 0; i < model.getDimensionCount(); i++) {
+          GenericDataType complexDataType = model.getComplexIndexMap().get(i);
           if (complexDataType != null) {
             complexDataType.fillAggKeyBlock(aggKeyBlockWithComplex, this.aggKeyBlock);
           } else {
@@ -314,35 +231,20 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     version = CarbonProperties.getInstance().getFormatVersion();
   }
 
-  private void initParameters(CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
-    this.databaseName = carbonFactDataHandlerModel.getDatabaseName();
-    this.tableBlockSize = carbonFactDataHandlerModel.getBlockSizeInMB();
-    this.tableName = carbonFactDataHandlerModel.getTableName();
-    this.type = carbonFactDataHandlerModel.getMeasureDataType();
-    this.segmentProperties = carbonFactDataHandlerModel.getSegmentProperties();
-    this.wrapperColumnSchemaList = carbonFactDataHandlerModel.getWrapperColumnSchema();
-    this.colCardinality = carbonFactDataHandlerModel.getColCardinality();
-    this.storeLocation = carbonFactDataHandlerModel.getStoreLocation();
-    this.measureCount = carbonFactDataHandlerModel.getMeasureCount();
-    this.mdkeyLength = carbonFactDataHandlerModel.getMdKeyLength();
-    this.mdKeyIndex = carbonFactDataHandlerModel.getMdKeyIndex();
-    this.noDictionaryCount = carbonFactDataHandlerModel.getNoDictionaryCount();
-    this.colGrpModel = segmentProperties.getColumnGroupModel();
-    this.completeDimLens = carbonFactDataHandlerModel.getDimLens();
-    this.dimLens = this.segmentProperties.getDimColumnsCardinality();
-    this.carbonDataFileAttributes = carbonFactDataHandlerModel.getCarbonDataFileAttributes();
-    this.schemaUpdatedTimeStamp = carbonFactDataHandlerModel.getSchemaUpdatedTimeStamp();
+  private void initParameters(CarbonFactDataHandlerModel model) {
+
+    this.colGrpModel = model.getSegmentProperties().getColumnGroupModel();
 
     //TODO need to pass carbon table identifier to metadata
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+    CarbonTable carbonTable =
+        CarbonMetadata.getInstance().getCarbonTable(
+            model.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + model.getTableName());
     isDictDimension =
-        CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(tableName));
+        CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(model.getTableName()));
 
-    this.compactionFlow = carbonFactDataHandlerModel.isCompactionFlow();
     // in compaction flow the measure with decimal type will come as spark decimal.
     // need to convert it to byte array.
-    if (compactionFlow) {
+    if (model.isCompactionFlow()) {
       try {
         numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING,
@@ -397,7 +299,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private void setComplexMapSurrogateIndex(int dimensionCount) {
     int surrIndex = 0;
     for (int i = 0; i < dimensionCount; i++) {
-      GenericDataType complexDataType = complexIndexMap.get(i);
+      GenericDataType complexDataType = model.getComplexIndexMap().get(i);
       if (complexDataType != null) {
         List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>();
         complexDataType.getAllPrimitiveChildren(primitiveTypes);
@@ -418,7 +320,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   public void initialise() throws CarbonDataWriterException {
     fileManager = new FileManager();
-    fileManager.setName(new File(this.storeLocation).getName());
+    fileManager.setName(new File(model.getStoreLocation()).getName());
     setWritingConfiguration();
   }
 
@@ -428,11 +330,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * @param row
    * @throws CarbonDataWriterException
    */
-  public void addDataToStore(Object[] row) throws CarbonDataWriterException {
+  public void addDataToStore(CarbonRow row) throws CarbonDataWriterException {
     dataRows.add(row);
     this.entryCount++;
-    // if entry count reaches to leaf node size then we are ready to
-    // write
+    // if entry count reaches to leaf node size then we are ready to write
     // this to leaf node file and update the intermediate files
     if (this.entryCount == this.blockletSize) {
       try {
@@ -472,10 +373,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     }
 
     /** update all keys based on the input row */
-    void update(int rowId, Object[] row) {
-      currentMDKey = (byte[]) row[mdKeyIndex];
-      if (noDictionaryCount > 0 || complexIndexMap.size() > 0) {
-        currentNoDictionaryKey = (byte[][]) row[mdKeyIndex - 1];
+    void update(int rowId, CarbonRow row) throws KeyGenException {
+      currentMDKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
+      if (model.getNoDictionaryCount() > 0 || model.getComplexIndexMap().size() > 0) {
+        currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       }
       if (rowId == 0) {
         startKey = currentMDKey;
@@ -493,7 +394,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       // If SORT_COLUMNS is used, may need to update start/end keys since the they may
       // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
       // start/end key
-      int numberOfDictSortColumns = segmentProperties.getNumberOfDictSortColumns();
+      int numberOfDictSortColumns = model.getSegmentProperties().getNumberOfDictSortColumns();
       if (numberOfDictSortColumns > 0) {
         // if SORT_COLUMNS contain dictionary columns
         int[] keySize = columnarSplitter.getBlockKeySize();
@@ -516,7 +417,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       }
 
       // Do the same update for noDictionary start/end Key
-      int numberOfNoDictSortColumns = segmentProperties.getNumberOfNoDictSortColumns();
+      int numberOfNoDictSortColumns = model.getSegmentProperties().getNumberOfNoDictSortColumns();
       if (numberOfNoDictSortColumns > 0) {
         // if sort_columns contain no-dictionary columns
         if (noDictStartKey.length > numberOfNoDictSortColumns) {
@@ -538,161 +439,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   }
 
   /**
-   * Represent a page data for all columns, we store its data in columnar layout, so that
-   * all processing apply to TablePage can be done in vectorized fashion.
-   */
-  class TablePage {
-
-    // For all dimension and measure columns, we store the column data directly in the page,
-    // the length of the page is the number of rows.
-
-    // TODO: we should have separate class for key columns so that keys are stored together in
-    // one vector to make it efficient for sorting
-    VarLengthColumnPage[] dictDimensionPage;
-    VarLengthColumnPage[] noDictDimensionPage;
-    ComplexColumnPage[] complexDimensionPage;
-    FixLengthColumnPage[] measurePage;
-
-    // the num of rows in this page, it must be less than short value (65536)
-    int pageSize;
-
-    TablePage(int pageSize) {
-      this.pageSize = pageSize;
-      dictDimensionPage = new VarLengthColumnPage[dimensionCount];
-      for (int i = 0; i < dictDimensionPage.length; i++) {
-        dictDimensionPage[i] = new VarLengthColumnPage(pageSize);
-      }
-      noDictDimensionPage = new VarLengthColumnPage[noDictionaryCount];
-      for (int i = 0; i < noDictDimensionPage.length; i++) {
-        noDictDimensionPage[i] = new VarLengthColumnPage(pageSize);
-      }
-      complexDimensionPage = new ComplexColumnPage[getComplexColumnCount()];
-      for (int i = 0; i < complexDimensionPage.length; i++) {
-        // here we still do not the depth of the complex column, it will be initialized when
-        // we get the first row.
-        complexDimensionPage[i] = null;
-      }
-      measurePage = new FixLengthColumnPage[measureCount];
-      for (int i = 0; i < measurePage.length; i++) {
-        measurePage[i] = new FixLengthColumnPage(type[i], pageSize);
-      }
-    }
-
-    /**
-     * Add one row to the internal store, it will be converted into columnar layout
-     * @param rowId Id of the input row
-     * @param rows row object
-     */
-    void addRow(int rowId, Object[] rows) {
-
-      // convert dictionary columns
-      byte[] MDKey = (byte[]) rows[mdKeyIndex];
-      if (columnarSplitter != null) {
-        byte[][] splitKey = columnarSplitter.splitKey(MDKey);
-        for (int i = 0; i < splitKey.length; i++) {
-          dictDimensionPage[i].putByteArray(rowId, splitKey[i]);
-        }
-      }
-
-      // convert noDictionary columns and complex columns.
-      if (noDictionaryCount > 0 || complexColCount > 0) {
-        byte[][] noDictAndComplex = (byte[][])(rows[mdKeyIndex - 1]);
-        for (int i = 0; i < noDictAndComplex.length; i++) {
-          if (i < noDictionaryCount) {
-            // noDictionary columns, since it is variable length, we need to prepare each
-            // element as LV encoded byte array (first two bytes are the length of the array)
-            byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
-            noDictDimensionPage[i].putByteArray(rowId, valueWithLength);
-          } else {
-            // complex columns
-            addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
-          }
-        }
-      }
-
-      // convert measure columns
-      for (int i = 0; i < type.length; i++) {
-        Object value = rows[i];
-
-        // in compaction flow the measure with decimal type will come as spark decimal.
-        // need to convert it to byte array.
-        if (type[i] == DataType.DECIMAL && compactionFlow) {
-          BigDecimal bigDecimal = ((Decimal) rows[i]).toJavaBigDecimal();
-          value = DataTypeUtil.bigDecimalToByte(bigDecimal);
-        }
-        measurePage[i].putData(rowId, value);
-      }
-    }
-
-    /**
-     * add a complex column into internal member compleDimensionPage
-     * @param index index of the complexDimensionPage
-     * @param rowId Id of the input row
-     * @param complexColumns byte array the complex columm to be added, extracted of input row
-     */
-    // TODO: this function should be refactoried, ColumnPage should support complex type encoding
-    // directly instead of doing it here
-    private void addComplexColumn(int index, int rowId, byte[] complexColumns) {
-      GenericDataType complexDataType = complexIndexMap.get(index + primitiveDimLens.length);
-
-      // initialize the page if first row
-      if (rowId == 0) {
-        int depthInComplexColumn = complexDataType.getColsCount();
-        complexDimensionPage[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
-      }
-
-      int depthInComplexColumn = complexDimensionPage[index].getDepth();
-      // this is the encoded columnar data which will be added to page,
-      // size of this list is the depth of complex column, we will fill it by input data
-      List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
-      for (int k = 0; k < depthInComplexColumn; k++) {
-        encodedComplexColumnar.add(new ArrayList<byte[]>());
-      }
-
-      // encode the complex type data and fill columnsArray
-      try {
-        ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
-        ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
-        DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
-        complexDataType.parseAndBitPack(byteArrayInput, dataOutputStream, complexKeyGenerator);
-        complexDataType.getColumnarDataForComplexType(encodedComplexColumnar,
-            ByteBuffer.wrap(byteArrayOutput.toByteArray()));
-        byteArrayOutput.close();
-      } catch (IOException | KeyGenException e) {
-        throw new CarbonDataWriterException(
-            "Problem while bit packing and writing complex datatype", e);
-      }
-
-      for (int depth = 0; depth < depthInComplexColumn; depth++) {
-        complexDimensionPage[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
-      }
-    }
-
-    // Adds length as a short element (first 2 bytes) to the head of the input byte array
-    private byte[] addLengthToByteArray(byte[] input) {
-      byte[] output = new byte[input.length + 2];
-      ByteBuffer buffer = ByteBuffer.wrap(output);
-      buffer.putShort((short) input.length);
-      buffer.put(input, 0, input.length);
-      return output;
-    }
-
-  }
-
-  /**
    * generate the NodeHolder from the input rows (one page in case of V3 format)
    */
-  private NodeHolder processDataRows(List<Object[]> dataRows)
-      throws CarbonDataWriterException {
+  private NodeHolder processDataRows(List<CarbonRow> dataRows)
+      throws CarbonDataWriterException, KeyGenException {
     if (dataRows.size() == 0) {
       return new NodeHolder();
     }
-    TablePage tablePage = new TablePage(dataRows.size());
+    TablePage tablePage = new TablePage(model, dataRows.size());
     IndexKey keys = new IndexKey(dataRows.size());
     int rowId = 0;
 
     // convert row to columnar data
-    for (Object[] row : dataRows) {
+    for (CarbonRow row : dataRows) {
       tablePage.addRow(rowId, row);
       keys.update(rowId, row);
       rowId++;
@@ -702,14 +461,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     // TODO: To make the encoding more transparent to the user, user should be enable to specify
     // the encoding and compression method for each type when creating table.
 
-    Codec codec = new Codec();
+    Codec codec = new Codec(model.getMeasureDataType());
     IndexStorage[] dimColumns = codec.encodeAndCompressDimensions(tablePage);
     Codec encodedMeasure = codec.encodeAndCompressMeasures(tablePage);
 
     // prepare nullBitSet for writer, remove this after writer can accept TablePage
-    BitSet[] nullBitSet = new BitSet[tablePage.measurePage.length];
+    BitSet[] nullBitSet = new BitSet[tablePage.getMeasurePage().length];
+    FixLengthColumnPage[] measurePages = tablePage.getMeasurePage();
     for (int i = 0; i < nullBitSet.length; i++) {
-      nullBitSet[i] = tablePage.measurePage[i].getNullBitSet();
+      nullBitSet[i] = measurePages[i].getNullBitSet();
     }
 
     LOGGER.info("Number Of records processed: " + dataRows.size());
@@ -767,10 +527,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     for (int i = 0; i < taskList.size(); i++) {
       try {
         taskList.get(i).get();
-      } catch (InterruptedException e) {
-        LOGGER.error(e, e.getMessage());
-        throw new CarbonDataWriterException(e.getMessage(), e);
-      } catch (ExecutionException e) {
+      } catch (InterruptedException | ExecutionException e) {
         LOGGER.error(e, e.getMessage());
         throw new CarbonDataWriterException(e.getMessage(), e);
       }
@@ -780,7 +537,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private int getColsCount(int columnSplit) {
     int count = 0;
     for (int i = 0; i < columnSplit; i++) {
-      GenericDataType complexDataType = complexIndexMap.get(i);
+      GenericDataType complexDataType = model.getComplexIndexMap().get(i);
       if (complexDataType != null) {
         count += complexDataType.getColsCount();
       } else count++;
@@ -791,8 +548,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   // return the number of complex column after complex columns are expanded
   private int getExpandedComplexColsCount() {
     int count = 0;
-    for (int i = 0; i < dimensionCount; i++) {
-      GenericDataType complexDataType = complexIndexMap.get(i);
+    int dictDimensionCount = model.getDimensionCount();
+    for (int i = 0; i < dictDimensionCount; i++) {
+      GenericDataType complexDataType = model.getComplexIndexMap().get(i);
       if (complexDataType != null) {
         count += complexDataType.getColsCount();
       }
@@ -802,7 +560,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
   // return the number of complex column
   private int getComplexColumnCount() {
-    return complexIndexMap.size();
+    return model.getComplexIndexMap().size();
   }
 
   /**
@@ -850,27 +608,22 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE);
     // if atleast one dimension is present then initialize column splitter otherwise null
     int noOfColStore = colGrpModel.getNoOfColumnStore();
-    int[] keyBlockSize = new int[noOfColStore + complexColCount];
+    int[] keyBlockSize = new int[noOfColStore + getExpandedComplexColsCount()];
 
-    if (dimLens.length > 0) {
+    if (model.getDimLens().length > 0) {
       //Using Variable length variable split generator
       //This will help in splitting mdkey to columns. variable split is required because all
       // columns which are part of
       //row store will be in single column store
       //e.g if {0,1,2,3,4,5} is dimension and {0,1,2) is row store dimension
       //than below splitter will return column as {0,1,2}{3}{4}{5}
-      this.columnarSplitter = this.segmentProperties.getFixedLengthKeySplitter();
+      this.columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter();
       System.arraycopy(columnarSplitter.getBlockKeySize(), 0, keyBlockSize, 0, noOfColStore);
       this.keyBlockHolder =
           new CarbonKeyBlockHolder[this.columnarSplitter.getBlockKeySize().length];
     } else {
       this.keyBlockHolder = new CarbonKeyBlockHolder[0];
     }
-    this.complexKeyGenerator = new KeyGenerator[completeDimLens.length];
-    for (int i = 0; i < completeDimLens.length; i++) {
-      complexKeyGenerator[i] =
-          KeyGeneratorFactory.getKeyGenerator(new int[] { completeDimLens[i] });
-    }
 
     for (int i = 0; i < keyBlockHolder.length; i++) {
       this.keyBlockHolder[i] = new CarbonKeyBlockHolder(blockletSize);
@@ -882,6 +635,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     List<Integer> customMeasureIndexList =
         new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    DataType[] type = model.getMeasureDataType();
     for (int j = 0; j < type.length; j++) {
       if (type[j] != DataType.BYTE && type[j] != DataType.DECIMAL) {
         otherMeasureIndexList.add(j);
@@ -898,9 +652,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     for (int i = 0; i < customMeasureIndex.length; i++) {
       customMeasureIndex[i] = customMeasureIndexList.get(i);
     }
-    setComplexMapSurrogateIndex(this.dimensionCount);
+    setComplexMapSurrogateIndex(model.getDimensionCount());
     int[] blockKeySize = getBlockKeySizeWithComplexTypes(new MultiDimKeyVarLengthEquiSplitGenerator(
-        CarbonUtil.getIncrementedCardinalityFullyFilled(completeDimLens.clone()), (byte) dimSet)
+        CarbonUtil.getIncrementedCardinalityFullyFilled(model.getDimLens().clone()), (byte) dimSet)
         .getBlockKeySize());
     System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore,
         blockKeySize.length - noOfColStore);
@@ -924,8 +678,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
     List<Integer> blockKeySizeWithComplex =
         new ArrayList<Integer>(blockKeySizeWithComplexTypes.length);
-    for (int i = 0; i < this.dimensionCount; i++) {
-      GenericDataType complexDataType = complexIndexMap.get(i);
+    int dictDimensionCount = model.getDimensionCount();
+    for (int i = 0; i < dictDimensionCount; i++) {
+      GenericDataType complexDataType = model.getComplexIndexMap().get(i);
       if (complexDataType != null) {
         complexDataType.fillBlockKeySize(blockKeySizeWithComplex, primitiveBlockKeySize);
       } else {
@@ -958,37 +713,37 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private CarbonDataWriterVo getDataWriterVo(int[] keyBlockSize) {
     CarbonDataWriterVo carbonDataWriterVo = new CarbonDataWriterVo();
-    carbonDataWriterVo.setStoreLocation(storeLocation);
-    carbonDataWriterVo.setMeasureCount(measureCount);
-    carbonDataWriterVo.setMdKeyLength(mdkeyLength);
-    carbonDataWriterVo.setTableName(tableName);
+    carbonDataWriterVo.setStoreLocation(model.getStoreLocation());
+    carbonDataWriterVo.setMeasureCount(model.getMeasureCount());
+    carbonDataWriterVo.setTableName(model.getTableName());
     carbonDataWriterVo.setKeyBlockSize(keyBlockSize);
     carbonDataWriterVo.setFileManager(fileManager);
     carbonDataWriterVo.setAggBlocks(aggKeyBlock);
     carbonDataWriterVo.setIsComplexType(isComplexTypes());
-    carbonDataWriterVo.setNoDictionaryCount(noDictionaryCount);
-    carbonDataWriterVo.setCarbonDataFileAttributes(carbonDataFileAttributes);
-    carbonDataWriterVo.setDatabaseName(databaseName);
-    carbonDataWriterVo.setWrapperColumnSchemaList(wrapperColumnSchemaList);
+    carbonDataWriterVo.setNoDictionaryCount(model.getNoDictionaryCount());
+    carbonDataWriterVo.setCarbonDataFileAttributes(model.getCarbonDataFileAttributes());
+    carbonDataWriterVo.setDatabaseName(model.getDatabaseName());
+    carbonDataWriterVo.setWrapperColumnSchemaList(model.getWrapperColumnSchema());
     carbonDataWriterVo.setIsDictionaryColumn(isDictDimension);
-    carbonDataWriterVo.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
-    carbonDataWriterVo.setColCardinality(colCardinality);
-    carbonDataWriterVo.setSegmentProperties(segmentProperties);
-    carbonDataWriterVo.setTableBlocksize(tableBlockSize);
+    carbonDataWriterVo.setCarbonDataDirectoryPath(model.getCarbonDataDirectoryPath());
+    carbonDataWriterVo.setColCardinality(model.getColCardinality());
+    carbonDataWriterVo.setSegmentProperties(model.getSegmentProperties());
+    carbonDataWriterVo.setTableBlocksize(model.getBlockSizeInMB());
     carbonDataWriterVo.setBucketNumber(bucketNumber);
     carbonDataWriterVo.setTaskExtension(taskExtension);
-    carbonDataWriterVo.setSchemaUpdatedTimeStamp(schemaUpdatedTimeStamp);
+    carbonDataWriterVo.setSchemaUpdatedTimeStamp(model.getSchemaUpdatedTimeStamp());
     return carbonDataWriterVo;
   }
 
   private boolean[] isComplexTypes() {
-    int noOfColumn = colGrpModel.getNoOfColumnStore() + noDictionaryCount + complexIndexMap.size();
+    int noDictionaryCount = model.getNoDictionaryCount();
+    int noOfColumn = colGrpModel.getNoOfColumnStore() + noDictionaryCount + getComplexColumnCount();
     int allColsCount = getColsCount(noOfColumn);
     boolean[] isComplexType = new boolean[allColsCount];
 
     List<Boolean> complexTypesList = new ArrayList<Boolean>(allColsCount);
     for (int i = 0; i < noOfColumn; i++) {
-      GenericDataType complexDataType = complexIndexMap.get(i - noDictionaryCount);
+      GenericDataType complexDataType = model.getComplexIndexMap().get(i - noDictionaryCount);
       if (complexDataType != null) {
         int count = complexDataType.getColsCount();
         for (int j = 0; j < count; j++) {
@@ -1081,11 +836,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   private final class Producer implements Callable<Void> {
 
     private BlockletDataHolder blockletDataHolder;
-    private List<Object[]> dataRows;
+    private List<CarbonRow> dataRows;
     private int sequenceNumber;
     private boolean isWriteAll;
 
-    private Producer(BlockletDataHolder blockletDataHolder, List<Object[]> dataRows,
+    private Producer(BlockletDataHolder blockletDataHolder, List<CarbonRow> dataRows,
         int sequenceNumber, boolean isWriteAll) {
       this.blockletDataHolder = blockletDataHolder;
       this.dataRows = dataRows;
@@ -1176,17 +931,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       this.isUseInvertedIndex = isUseInvertedIndex;
     }
 
-    public BlockSortThread(int index, byte[][] data, boolean b, boolean isNoDictionary,
+    public BlockSortThread(byte[][] data, boolean compression, boolean isNoDictionary,
         boolean isSortRequired, boolean isUseInvertedIndex) {
-      this.index = index;
       this.data = data;
-      isCompressionReq = b;
+      this.isCompressionReq = compression;
       this.isNoDictionary = isNoDictionary;
       this.isSortRequired = isSortRequired;
       this.isUseInvertedIndex = isUseInvertedIndex;
     }
 
     @Override public IndexStorage call() throws Exception {
+      if (index == 1) {
+        int dd = 1 + 1;
+      }
       if (isUseInvertedIndex) {
         if (version == ColumnarFormatVersion.V3) {
           return new BlockIndexerStorageForShort(this.data, isCompressionReq, isNoDictionary,
@@ -1210,8 +967,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   public class Codec {
     private WriterCompressModel compressionModel;
     private byte[][] encodedMeasureArray;
+    private DataType[] measureType;
 
-    Codec() {
+    Codec(DataType[] measureType) {
+      this.measureType = measureType;
     }
 
     public WriterCompressModel getCompressionModel() {
@@ -1225,7 +984,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     public Codec encodeAndCompressMeasures(TablePage tablePage) {
       // TODO: following conversion is required only because compress model requires them,
       // remove then after the compress framework is refactoried
-      FixLengthColumnPage[] measurePage = tablePage.measurePage;
+      FixLengthColumnPage[] measurePage = tablePage.getMeasurePage();
       int measureCount = measurePage.length;
       Object[] min = new Object[measurePage.length];
       Object[] max = new Object[measurePage.length];
@@ -1238,8 +997,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
         decimal[i] = measurePage[i].getStatistics().getDecimal();
       }
       // encode and compress measure column page
-      compressionModel = ValueCompressionUtil
-          .getWriterCompressModel(max, min, decimal, uniqueValue, type, new byte[measureCount]);
+      compressionModel =
+          ValueCompressionUtil.getWriterCompressModel(max, min, decimal, uniqueValue, measureType,
+              new byte[measureCount]);
       encodedMeasureArray = encodeMeasure(compressionModel, measurePage);
       return this;
     }
@@ -1295,8 +1055,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      * Encode and compress each column page. The work is done using a thread pool.
      */
     private IndexStorage[] encodeAndCompressDimensions(TablePage tablePage) {
-      int noDictionaryCount = tablePage.noDictDimensionPage.length;
-      int complexColCount = tablePage.complexDimensionPage.length;
+      int noDictionaryCount = tablePage.getNoDictDimensionPage().length;
+      int complexColCount = tablePage.getComplexDimensionPage().length;
 
       // thread pool size to be used for encoding dimension
       // each thread will sort the column page data and compress it
@@ -1306,12 +1066,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       ExecutorService executorService = Executors.newFixedThreadPool(thread_pool_size);
       Callable<IndexStorage> callable;
       List<Future<IndexStorage>> submit = new ArrayList<Future<IndexStorage>>(
-          primitiveDimLens.length + noDictionaryCount + complexColCount);
+          model.getPrimitiveDimLens().length + noDictionaryCount + complexColCount);
       int i = 0;
       int dictionaryColumnCount = -1;
       int noDictionaryColumnCount = -1;
       int colGrpId = -1;
       boolean isSortColumn = false;
+      SegmentProperties segmentProperties = model.getSegmentProperties();
       for (i = 0; i < isDictDimension.length; i++) {
         isSortColumn = i < segmentProperties.getNumberOfSortColumns();
         if (isDictDimension[i]) {
@@ -1320,8 +1081,9 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
             // dictionary dimension
             callable =
                 new BlockSortThread(
-                    i,
-                    tablePage.dictDimensionPage[dictionaryColumnCount].getByteArrayPage(),
+                    tablePage.getKeyColumnPage().getKeyVector(dictionaryColumnCount),
+                    true,
+                    false,
                     isSortColumn,
                     isUseInvertedIndex[i] & isSortColumn);
 
@@ -1330,14 +1092,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
             callable = new ColGroupBlockStorage(
                 segmentProperties,
                 ++colGrpId,
-                tablePage.dictDimensionPage[dictionaryColumnCount].getByteArrayPage());
+                tablePage.getKeyColumnPage().getKeyVector(dictionaryColumnCount));
           }
         } else {
           // no dictionary dimension
           callable =
               new BlockSortThread(
-                  i,
-                  tablePage.noDictDimensionPage[++noDictionaryColumnCount].getByteArrayPage(),
+                  tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getByteArrayPage(),
                   false,
                   true,
                   isSortColumn,
@@ -1349,12 +1110,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
 
       // complex type column
       for (int index = 0; index < getComplexColumnCount(); index++) {
-        Iterator<byte[][]> iterator = tablePage.complexDimensionPage[index].iterator();
+        Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[index].iterator();
         while (iterator.hasNext()) {
+          byte[][] data = iterator.next();
           callable =
               new BlockSortThread(
                   i++,
-                  iterator.next(),
+                  data,
                   false,
                   true);
           submit.add(executorService.submit(callable));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 44958a4..df27dcc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -43,9 +43,8 @@ import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
-/**
- * This class contains all the data required for processing and writing the carbon data
- */
+// This class contains all the data required for processing and writing the carbon data
+// TODO: we should try to minimize this class as refactorying loading process
 public class CarbonFactDataHandlerModel {
 
   /**
@@ -74,59 +73,25 @@ public class CarbonFactDataHandlerModel {
    */
   private int measureCount;
   /**
-   * length of mdKey
-   */
-  private int mdKeyLength;
-  /**
-   * mdKey index in one row object
-   */
-  private int mdKeyIndex;
-  /**
-   * aggregators (e,g min, amx, sum)
-   */
-  private String[] aggregators;
-  /**
-   * custom aggregator class which contains the logic of merging data
-   */
-  private String[] aggregatorClass;
-  /**
    * local store location
    */
   private String storeLocation;
   /**
-   * cardinality of all dimensions
-   */
-  private int[] factDimLens;
-  /**
-   * flag to check whether to merge data based on custom aggregator
-   */
-  private boolean isMergingRequestForCustomAgg;
-  /**
    * flag to check whether use inverted index
    */
   private boolean[] isUseInvertedIndex;
+
   /**
-   * dimension cardinality
+   * length of each dimension, including dictionary, nodictioncy, complex dimension
    */
   private int[] dimLens;
+
   /**
-   * array of fact table columns
-   */
-  private String[] factLevels;
-  /**
-   * array of aggregate levels
-   */
-  private String[] aggLevels;
-  /**
-   * flag for data writing request
-   */
-  private boolean isDataWritingRequest;
-  /**
-   * count of columns for which dictionary is not generated
+   * total number of no dictionary dimension in the table
    */
   private int noDictionaryCount;
   /**
-   * total number of columns in table
+   * total number of dictionary dimension and complex dimension columns in table
    */
   private int dimensionCount;
   /**
@@ -185,10 +150,11 @@ public class CarbonFactDataHandlerModel {
 
   private int taskExtension;
 
+  // key generator for complex dimension
+  private KeyGenerator[] complexDimensionKeyGenerator;
+
   /**
    * Create the model using @{@link CarbonDataLoadConfiguration}
-   * @param configuration
-   * @return CarbonFactDataHandlerModel
    */
   public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel(
       CarbonDataLoadConfiguration configuration, String storeLocation, int bucketId,
@@ -207,26 +173,6 @@ public class CarbonFactDataHandlerModel {
         }
       }
     }
-    List<Integer> dimsLenList = new ArrayList<Integer>();
-    for (int eachDimLen : dimLensWithComplex) {
-      if (eachDimLen != 0) dimsLenList.add(eachDimLen);
-    }
-    int[] dimLens = new int[dimsLenList.size()];
-    for (int i = 0; i < dimsLenList.size(); i++) {
-      dimLens[i] = dimsLenList.get(i);
-    }
-
-    int dimensionCount = configuration.getDimensionCount();
-    int noDictionaryCount = configuration.getNoDictionaryCount();
-    int complexDimensionCount = configuration.getComplexDimensionCount();
-    int measureCount = configuration.getMeasureCount();
-
-    int simpleDimsCount = dimensionCount - noDictionaryCount - complexDimensionCount;
-    int[] simpleDimsLen = new int[simpleDimsCount];
-    for (int i = 0; i < simpleDimsCount; i++) {
-      simpleDimsLen[i] = dimLens[i];
-    }
-
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
         identifier.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + identifier
             .getTableName());
@@ -235,12 +181,22 @@ public class CarbonFactDataHandlerModel {
             carbonTable.getMeasureByTableName(identifier.getTableName()));
     int[] colCardinality =
         CarbonUtil.getFormattedCardinality(dimLensWithComplex, wrapperColumnSchema);
+
     SegmentProperties segmentProperties =
         new SegmentProperties(wrapperColumnSchema, colCardinality);
-    // Actual primitive dimension used to generate start & end key
 
-    KeyGenerator keyGenerator = segmentProperties.getDimensionKeyGenerator();
+    int[] dimLens = configuration.calcDimensionLengths();
+
+    int dimensionCount = configuration.getDimensionCount();
+    int noDictionaryCount = configuration.getNoDictionaryCount();
+    int complexDimensionCount = configuration.getComplexColumnCount();
+    int measureCount = configuration.getMeasureCount();
 
+    int simpleDimsCount = dimensionCount - noDictionaryCount - complexDimensionCount;
+    int[] simpleDimsLen = new int[simpleDimsCount];
+    for (int i = 0; i < simpleDimsCount; i++) {
+      simpleDimsLen[i] = dimLens[i];
+    }
     //To Set MDKey Index of each primitive type in complex type
     int surrIndex = simpleDimsCount;
     Iterator<Map.Entry<String, GenericDataType>> complexMap =
@@ -271,30 +227,23 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel
         .setTableName(identifier.getTableName());
     carbonFactDataHandlerModel.setMeasureCount(measureCount);
-    carbonFactDataHandlerModel.setMdKeyLength(keyGenerator.getKeySizeInBytes());
     carbonFactDataHandlerModel.setStoreLocation(storeLocation);
     carbonFactDataHandlerModel.setDimLens(dimLens);
     carbonFactDataHandlerModel.setNoDictionaryCount(noDictionaryCount);
-    carbonFactDataHandlerModel
-        .setDimensionCount(configuration.getDimensionCount() - noDictionaryCount);
+    carbonFactDataHandlerModel.setDimensionCount(
+        configuration.getDimensionCount() - noDictionaryCount);
     carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
     carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
     carbonFactDataHandlerModel.setColCardinality(colCardinality);
-    carbonFactDataHandlerModel.setDataWritingRequest(true);
-    carbonFactDataHandlerModel.setMeasureDataType(CarbonDataProcessorUtil
-        .getMeasureDataType(configuration.getMeasureCount(), configuration.getMeasureFields()));
-    carbonFactDataHandlerModel.setFactDimLens(dimLens);
+    carbonFactDataHandlerModel.setMeasureDataType(configuration.getMeasureDataType());
     carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
     carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen);
     carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndex);
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
-    if (noDictionaryCount > 0 || complexDimensionCount > 0) {
-      carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
-    } else {
-      carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
-    }
+    carbonFactDataHandlerModel.setComplexDimensionKeyGenerator(
+        configuration.createKeyGeneratorForComplexDimension());
     carbonFactDataHandlerModel.bucketId = bucketId;
     carbonFactDataHandlerModel.segmentId = configuration.getSegmentId();
     carbonFactDataHandlerModel.taskExtension = taskExtension;
@@ -315,8 +264,6 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName());
     carbonFactDataHandlerModel.setTableName(tableName);
     carbonFactDataHandlerModel.setMeasureCount(segmentProperties.getMeasures().size());
-    carbonFactDataHandlerModel
-        .setMdKeyLength(segmentProperties.getDimensionKeyGenerator().getKeySizeInBytes());
     carbonFactDataHandlerModel.setStoreLocation(tempStoreLocation);
     carbonFactDataHandlerModel.setDimLens(segmentProperties.getDimColumnsCardinality());
     carbonFactDataHandlerModel.setSegmentProperties(segmentProperties);
@@ -337,14 +284,12 @@ public class CarbonFactDataHandlerModel {
     Map<Integer, GenericDataType> complexIndexMap =
         new HashMap<Integer, GenericDataType>(segmentProperties.getComplexDimensions().size());
     carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
-    carbonFactDataHandlerModel.setDataWritingRequest(true);
     DataType[] aggType = new DataType[segmentProperties.getMeasures().size()];
     int i = 0;
     for (CarbonMeasure msr : segmentProperties.getMeasures()) {
       aggType[i++] = msr.getDataType();
     }
     carbonFactDataHandlerModel.setMeasureDataType(aggType);
-    carbonFactDataHandlerModel.setFactDimLens(segmentProperties.getDimColumnsCardinality());
     String carbonDataDirectoryPath = CarbonDataProcessorUtil
         .checkAndCreateCarbonStoreLocation(loadModel.getStorePath(), loadModel.getDatabaseName(),
             tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
@@ -359,12 +304,6 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
     carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
-    if (segmentProperties.getNumberOfNoDictionaryDimension() > 0
-        || segmentProperties.getComplexDimensions().size() > 0) {
-      carbonFactDataHandlerModel.setMdKeyIndex(segmentProperties.getMeasures().size() + 1);
-    } else {
-      carbonFactDataHandlerModel.setMdKeyIndex(segmentProperties.getMeasures().size());
-    }
     return carbonFactDataHandlerModel;
   }
 
@@ -426,22 +365,6 @@ public class CarbonFactDataHandlerModel {
     this.measureCount = measureCount;
   }
 
-  public int getMdKeyLength() {
-    return mdKeyLength;
-  }
-
-  public void setMdKeyLength(int mdKeyLength) {
-    this.mdKeyLength = mdKeyLength;
-  }
-
-  public int getMdKeyIndex() {
-    return mdKeyIndex;
-  }
-
-  public void setMdKeyIndex(int mdKeyIndex) {
-    this.mdKeyIndex = mdKeyIndex;
-  }
-
   public String getStoreLocation() {
     return storeLocation;
   }
@@ -450,10 +373,6 @@ public class CarbonFactDataHandlerModel {
     this.storeLocation = storeLocation;
   }
 
-  public void setFactDimLens(int[] factDimLens) {
-    this.factDimLens = factDimLens;
-  }
-
   public int[] getDimLens() {
     return dimLens;
   }
@@ -462,10 +381,6 @@ public class CarbonFactDataHandlerModel {
     this.dimLens = dimLens;
   }
 
-  public void setDataWritingRequest(boolean dataWritingRequest) {
-    isDataWritingRequest = dataWritingRequest;
-  }
-
   public int getNoDictionaryCount() {
     return noDictionaryCount;
   }
@@ -590,5 +505,22 @@ public class CarbonFactDataHandlerModel {
   public int getTaskExtension() {
     return taskExtension;
   }
+
+  public KeyGenerator[] getComplexDimensionKeyGenerator() {
+    return complexDimensionKeyGenerator;
+  }
+
+  public void setComplexDimensionKeyGenerator(KeyGenerator[] complexDimensionKeyGenerator) {
+    this.complexDimensionKeyGenerator = complexDimensionKeyGenerator;
+  }
+
+  public KeyGenerator getMDKeyGenerator() {
+    return segmentProperties.getDimensionKeyGenerator();
+  }
+
+  // return the number of complex columns
+  public int getComplexColumnCount() {
+    return complexIndexMap.size();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
index 42a32d7..16d2da0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandler.java
@@ -17,12 +17,13 @@
 
 package org.apache.carbondata.processing.store;
 
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 
 public interface CarbonFactHandler {
   void initialise() throws CarbonDataWriterException;
 
-  void addDataToStore(Object[] row) throws CarbonDataWriterException;
+  void addDataToStore(CarbonRow row) throws CarbonDataWriterException;
 
   void finish() throws CarbonDataWriterException;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
new file mode 100644
index 0000000..cf1b746
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
+import org.apache.carbondata.core.datastore.page.FixLengthColumnPage;
+import org.apache.carbondata.core.datastore.page.KeyColumnPage;
+import org.apache.carbondata.core.datastore.page.VarLengthColumnPage;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.WriteStepRowUtil;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+import org.apache.spark.sql.types.Decimal;
+
+/**
+ * Represent a page data for all columns, we store its data in columnar layout, so that
+ * all processing apply to TablePage can be done in vectorized fashion.
+ */
+class TablePage {
+
+  // For all dimension and measure columns, we store the column data directly in the page,
+  // the length of the page is the number of rows.
+
+  // TODO: we should have separate class for key columns so that keys are stored together in
+  // one vector to make it efficient for sorting
+  private KeyColumnPage keyColumnPage;
+  private VarLengthColumnPage[] noDictDimensionPage;
+  private ComplexColumnPage[] complexDimensionPage;
+  private FixLengthColumnPage[] measurePage;
+
+  // the num of rows in this page, it must be less than short value (65536)
+  private int pageSize;
+
+  private CarbonFactDataHandlerModel model;
+
+  TablePage(CarbonFactDataHandlerModel model, int pageSize) {
+    this.model = model;
+    this.pageSize = pageSize;
+    keyColumnPage = new KeyColumnPage(pageSize,
+        model.getSegmentProperties().getDimensionPartitions().length);
+    noDictDimensionPage = new VarLengthColumnPage[model.getNoDictionaryCount()];
+    for (int i = 0; i < noDictDimensionPage.length; i++) {
+      noDictDimensionPage[i] = new VarLengthColumnPage(pageSize);
+    }
+    complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()];
+    for (int i = 0; i < complexDimensionPage.length; i++) {
+      // here we still do not the depth of the complex column, it will be initialized when
+      // we get the first row.
+      complexDimensionPage[i] = null;
+    }
+    measurePage = new FixLengthColumnPage[model.getMeasureCount()];
+    DataType[] dataTypes = model.getMeasureDataType();
+    for (int i = 0; i < measurePage.length; i++) {
+      measurePage[i] = new FixLengthColumnPage(dataTypes[i], pageSize);
+    }
+  }
+
+  /**
+   * Add one row to the internal store, it will be converted into columnar layout
+   *
+   * @param rowId Id of the input row
+   * @param row   row object
+   */
+  void addRow(int rowId, CarbonRow row) throws KeyGenException {
+    // convert each column category
+
+    // 1. convert dictionary columns
+    byte[] mdk = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
+    byte[][] keys = model.getSegmentProperties().getFixedLengthKeySplitter().splitKey(mdk);
+    keyColumnPage.putKey(rowId, keys);
+
+    // 2. convert noDictionary columns and complex columns.
+    int noDictionaryCount = noDictDimensionPage.length;
+    int complexColumnCount = complexDimensionPage.length;
+    if (noDictionaryCount > 0 || complexColumnCount > 0) {
+      byte[][] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
+      for (int i = 0; i < noDictAndComplex.length; i++) {
+        if (i < noDictionaryCount) {
+          // noDictionary columns, since it is variable length, we need to prepare each
+          // element as LV encoded byte array (first two bytes are the length of the array)
+          byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
+          noDictDimensionPage[i].putByteArray(rowId, valueWithLength);
+        } else {
+          // complex columns
+          addComplexColumn(i - noDictionaryCount, rowId, noDictAndComplex[i]);
+        }
+      }
+    }
+
+    // 3. convert measure columns
+    Object[] measureColumns = WriteStepRowUtil.getMeasure(row);
+    for (int i = 0; i < measurePage.length; i++) {
+      Object value = measureColumns[i];
+
+      // in compaction flow the measure with decimal type will come as Spark decimal.
+      // need to convert it to byte array.
+      if (measurePage[i].getDataType() == DataType.DECIMAL && model.isCompactionFlow()) {
+        BigDecimal bigDecimal = ((Decimal) value).toJavaBigDecimal();
+        value = DataTypeUtil.bigDecimalToByte(bigDecimal);
+      }
+      measurePage[i].putData(rowId, value);
+    }
+  }
+
+  /**
+   * add a complex column into internal member compleDimensionPage
+   *
+   * @param index          index of the complexDimensionPage
+   * @param rowId          Id of the input row
+   * @param complexColumns byte array the complex columm to be added, extracted of input row
+   */
+  // TODO: this function should be refactoried, ColumnPage should support complex type encoding
+  // directly instead of doing it here
+  private void addComplexColumn(int index, int rowId, byte[] complexColumns) {
+    GenericDataType complexDataType = model.getComplexIndexMap().get(
+        index + model.getPrimitiveDimLens().length);
+
+    // initialize the page if first row
+    if (rowId == 0) {
+      int depthInComplexColumn = complexDataType.getColsCount();
+      getComplexDimensionPage()[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
+    }
+
+    int depthInComplexColumn = getComplexDimensionPage()[index].getDepth();
+    // this is the encoded columnar data which will be added to page,
+    // size of this list is the depth of complex column, we will fill it by input data
+    List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
+    for (int k = 0; k < depthInComplexColumn; k++) {
+      encodedComplexColumnar.add(new ArrayList<byte[]>());
+    }
+
+    // encode the complex type data and fill columnsArray
+    try {
+      ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
+      ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
+      DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
+      complexDataType.parseAndBitPack(byteArrayInput, dataOutputStream,
+          model.getComplexDimensionKeyGenerator());
+      complexDataType.getColumnarDataForComplexType(encodedComplexColumnar,
+          ByteBuffer.wrap(byteArrayOutput.toByteArray()));
+      byteArrayOutput.close();
+    } catch (IOException | KeyGenException e) {
+      throw new CarbonDataWriterException("Problem while bit packing and writing complex datatype",
+          e);
+    }
+
+    for (int depth = 0; depth < depthInComplexColumn; depth++) {
+      getComplexDimensionPage()[index]
+          .putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
+    }
+  }
+
+  // Adds length as a short element (first 2 bytes) to the head of the input byte array
+  private byte[] addLengthToByteArray(byte[] input) {
+    byte[] output = new byte[input.length + 2];
+    ByteBuffer buffer = ByteBuffer.wrap(output);
+    buffer.putShort((short) input.length);
+    buffer.put(input, 0, input.length);
+    return output;
+  }
+
+  public KeyColumnPage getKeyColumnPage() {
+    return keyColumnPage;
+  }
+
+  public VarLengthColumnPage[] getNoDictDimensionPage() {
+    return noDictDimensionPage;
+  }
+
+  public ComplexColumnPage[] getComplexDimensionPage() {
+    return complexDimensionPage;
+  }
+
+  public FixLengthColumnPage[] getMeasurePage() {
+    return measurePage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
index def2aaa..8fa8432 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/colgroup/ColGroupBlockStorage.java
@@ -39,14 +39,6 @@ public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage
     }
   }
 
-  @Deprecated
-  private ColGroupDataHolder colGrpDataHolder;
-
-  @Deprecated
-  public ColGroupBlockStorage(DataHolder colGrpDataHolder) {
-    this.colGrpDataHolder = (ColGroupDataHolder) colGrpDataHolder;
-  }
-
   /**
    * sorting is not required for colgroup storage and hence return true
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 770d24c..69c4eb1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -72,9 +72,7 @@ import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterE
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.io.IOUtils;
 
-public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<T>
-
-{
+public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<T> {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(AbstractFactDataWriter.class.getName());
@@ -141,10 +139,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * file size at any given point
    */
   private long currentFileSize;
-  /**
-   * size reserved in one file for writing block meta data. It will be in percentage
-   */
-  private int spaceReservedForBlockMetaSize;
 
   protected FileOutputStream fileOutputStream;
 
@@ -171,7 +165,10 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
     this.fileSizeInBytes =
         (long) dataWriterVo.getTableBlocksize() * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR
             * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR;
-    this.spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
+    /*
+    size reserved in one file for writing block meta data. It will be in percentage
+   */
+    int spaceReservedForBlockMetaSize = Integer.parseInt(propInstance
         .getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE,
             CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT));
     this.dataBlockSize = fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
index ce53ec8..8fbf9c0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java
@@ -32,8 +32,6 @@ public class CarbonDataWriterVo {
 
   private int measureCount;
 
-  private int mdKeyLength;
-
   private String tableName;
 
   private IFileManagerComposite fileManager;
@@ -52,8 +50,6 @@ public class CarbonDataWriterVo {
 
   private List<ColumnSchema> wrapperColumnSchemaList;
 
-  private int numberOfNoDictionaryColumn;
-
   private boolean[] isDictionaryColumn;
 
   private String carbonDataDirectoryPath;
@@ -99,13 +95,6 @@ public class CarbonDataWriterVo {
   }
 
   /**
-   * @param mdKeyLength the mdKeyLength to set
-   */
-  public void setMdKeyLength(int mdKeyLength) {
-    this.mdKeyLength = mdKeyLength;
-  }
-
-  /**
    * @return the tableName
    */
   public String getTableName() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index 18f1b2e..4c4fead 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -357,8 +357,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter<short[]>
     // calculate the size of data chunks
     try {
       for (int i = 0; i < nodeHolderList.get(0).getKeyArray().length; i++) {
-        dataChunkBytes[i] = CarbonUtil.getByteArray(CarbonMetadataUtil
-            .getDataChunk3(nodeHolderList, thriftColumnSchemaList,
+        dataChunkBytes[i] = CarbonUtil.getByteArray(
+            CarbonMetadataUtil.getDataChunk3(nodeHolderList, thriftColumnSchemaList,
                 dataWriterVo.getSegmentProperties(), i, true));
         blockletDataSize += dataChunkBytes[i].length;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/353272ef/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 34d822c..740b5dc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -32,13 +32,10 @@ import java.util.Set;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.IgnoreDictionary;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -56,7 +53,6 @@ import org.apache.carbondata.processing.datatypes.StructDataType;
 import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
 
 import org.apache.commons.lang3.ArrayUtils;
@@ -93,32 +89,6 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
-   * Utility method to get level cardinality string
-   *
-   * @param dimCardinalities
-   * @param aggDims
-   * @return level cardinality string
-   */
-  public static String getLevelCardinalitiesString(Map<String, String> dimCardinalities,
-      String[] aggDims) {
-    StringBuilder sb = new StringBuilder();
-
-    for (int i = 0; i < aggDims.length; i++) {
-      String string = dimCardinalities.get(aggDims[i]);
-      if (string != null) {
-        sb.append(string);
-        sb.append(CarbonCommonConstants.COMA_SPC_CHARACTER);
-      }
-    }
-    String resultStr = sb.toString();
-    if (resultStr.endsWith(CarbonCommonConstants.COMA_SPC_CHARACTER)) {
-      resultStr = resultStr
-          .substring(0, resultStr.length() - CarbonCommonConstants.COMA_SPC_CHARACTER.length());
-    }
-    return resultStr;
-  }
-
-  /**
    * @param storeLocation
    */
   public static void renameBadRecordsFromInProgressToNormal(String storeLocation) {
@@ -237,9 +207,9 @@ public final class CarbonDataProcessorUtil {
         break;
       }
 
-      if (!field.hasDictionaryEncoding() && field.getColumn().isDimesion()) {
+      if (!field.hasDictionaryEncoding() && field.getColumn().isDimension()) {
         noDictionaryMapping.add(true);
-      } else if (field.getColumn().isDimesion()) {
+      } else if (field.getColumn().isDimension()) {
         noDictionaryMapping.add(false);
       }
     }
@@ -253,9 +223,9 @@ public final class CarbonDataProcessorUtil {
   public static boolean[] getIsUseInvertedIndex(DataField[] fields) {
     List<Boolean> isUseInvertedIndexList = new ArrayList<Boolean>();
     for (DataField field : fields) {
-      if (field.getColumn().isUseInvertedIndex() && field.getColumn().isDimesion()) {
+      if (field.getColumn().isUseInvertedIndex() && field.getColumn().isDimension()) {
         isUseInvertedIndexList.add(true);
-      } else if (field.getColumn().isDimesion()) {
+      } else if (field.getColumn().isDimension()) {
         isUseInvertedIndexList.add(false);
       }
     }
@@ -349,13 +319,6 @@ public final class CarbonDataProcessorUtil {
     return true;
   }
 
-  public static boolean isHeaderValid(String tableName, String header,
-      CarbonDataLoadSchema schema, String delimiter) {
-    String convertedDelimiter = CarbonUtil.delimiterConverter(delimiter);
-    String[] csvHeader = getColumnFields(header.toLowerCase(), convertedDelimiter);
-    return isHeaderValid(tableName, csvHeader, schema);
-  }
-
   /**
    * This method update the column Name
    *
@@ -388,25 +351,6 @@ public final class CarbonDataProcessorUtil {
     return columnNames;
   }
 
-  /**
-   * Splits header to fields using delimiter.
-   * @param header
-   * @param delimiter
-   * @return
-   */
-  public static String[] getColumnFields(String header, String delimiter) {
-    delimiter = CarbonUtil.delimiterConverter(delimiter);
-    String[] columnNames = header.split(delimiter);
-    String tmpCol;
-    for (int i = 0; i < columnNames.length; i++) {
-      tmpCol = columnNames[i].replaceAll("\"", "");
-      columnNames[i] = tmpCol.trim();
-    }
-
-    return columnNames;
-  }
-
-
   public static DataType[] getMeasureDataType(int measureCount, String databaseName,
       String tableName) {
     DataType[] type = new DataType[measureCount];
@@ -449,45 +393,6 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
-   * This method will convert surrogate key to MD key and fill the row in format
-   * required by the writer for further processing
-   *
-   * @param row
-   * @param segmentProperties
-   * @param measureCount
-   * @param noDictionaryCount
-   * @param complexDimensionCount
-   * @return
-   * @throws KeyGenException
-   */
-  public static Object[] convertToMDKeyAndFillRow(CarbonRow row,
-      SegmentProperties segmentProperties, int measureCount, int noDictionaryCount,
-      int complexDimensionCount) throws KeyGenException {
-    Object[] outputRow = null;
-    // adding one for the high cardinality dims byte array.
-    if (noDictionaryCount > 0 || complexDimensionCount > 0) {
-      outputRow = new Object[measureCount + 1 + 1];
-    } else {
-      outputRow = new Object[measureCount + 1];
-    }
-    int l = 0;
-    int index = 0;
-    Object[] measures = row.getObjectArray(IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex());
-    for (int i = 0; i < measureCount; i++) {
-      outputRow[l++] = measures[index++];
-    }
-    outputRow[l] = row.getObject(IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex());
-    int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
-    int[] dimsArray = row.getIntArray(IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex());
-    for (int i = 0; i < highCardExcludedRows.length; i++) {
-      highCardExcludedRows[i] = dimsArray[i];
-    }
-    outputRow[outputRow.length - 1] =
-        segmentProperties.getDimensionKeyGenerator().generateKey(highCardExcludedRows);
-    return outputRow;
-  }
-
-  /**
    * This method will get the store location for the given path, segment id and partition id
    *
    * @return data directory path