You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/08 17:03:18 UTC

[1/2] carbondata git commit: [CARBONDATA-2018][DataLoad] Optimization in reading/writing for sort temp row

Repository: carbondata
Updated Branches:
  refs/heads/carbonstore cd7eed66b -> de92ea9a1


http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 11b3d43..527452a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -31,15 +31,14 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
@@ -63,21 +62,15 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
    * entry count
    */
   private int entryCount;
-
   /**
    * return row
    */
-  private Object[] returnRow;
-  private int dimCnt;
-  private int complexCnt;
-  private int measureCnt;
-  private boolean[] isNoDictionaryDimensionColumn;
-  private DataType[] measureDataTypes;
+  private IntermediateSortTempRow returnRow;
   private int readBufferSize;
   private String compressorName;
-  private Object[][] currentBuffer;
+  private IntermediateSortTempRow[] currentBuffer;
 
-  private Object[][] backupBuffer;
+  private IntermediateSortTempRow[] backupBuffer;
 
   private boolean isBackupFilled;
 
@@ -100,27 +93,21 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
   private int numberOfObjectRead;
 
-  private int nullSetWordsLength;
-
-  private Comparator<Object[]> comparator;
-
+  private TableFieldStat tableFieldStat;
+  private SortStepRowHandler sortStepRowHandler;
+  private Comparator<IntermediateSortTempRow> comparator;
   /**
    * Constructor to initialize
    */
   public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
     // set temp file
     this.tempFile = tempFile;
-    this.dimCnt = parameters.getDimColCount();
-    this.complexCnt = parameters.getComplexDimColCount();
-    this.measureCnt = parameters.getMeasureColCount();
-    this.isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
-    this.measureDataTypes = parameters.getMeasureDataType();
     this.readBufferSize = parameters.getBufferSize();
     this.compressorName = parameters.getSortTempCompressorName();
-
+    this.tableFieldStat = new TableFieldStat(parameters);
+    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
     this.executorService = Executors.newFixedThreadPool(1);
-    this.nullSetWordsLength = ((parameters.getMeasureColCount() - 1) >> 6) + 1;
-    comparator = new NewRowComparator(parameters.getNoDictionarySortColumn());
+    comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn());
     initialize();
   }
 
@@ -169,11 +156,17 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
    *
    * @throws CarbonSortKeyAndGroupByException problem while reading
    */
+  @Override
   public void readRow() throws CarbonSortKeyAndGroupByException {
     if (prefetch) {
       fillDataForPrefetch();
     } else {
-      this.returnRow = getRowFromStream();
+      try {
+        this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+        this.numberOfObjectRead++;
+      } catch (IOException e) {
+        throw new CarbonSortKeyAndGroupByException("Problems while reading row", e);
+      }
     }
   }
 
@@ -207,63 +200,22 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
   }
 
   /**
-   * @return
-   * @throws CarbonSortKeyAndGroupByException
+   * get a batch of row, this interface is used in reading compressed sort temp files
+   *
+   * @param expected expected number in a batch
+   * @return a batch of row
+   * @throws IOException if error occurs while reading from stream
    */
-  private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
-    Object[] row = new Object[dimCnt + measureCnt];
-    try {
-      int dimCount = 0;
-      for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
-        if (isNoDictionaryDimensionColumn[dimCount]) {
-          short aShort = stream.readShort();
-          byte[] col = new byte[aShort];
-          stream.readFully(col);
-          row[dimCount] = col;
-        } else {
-          int anInt = stream.readInt();
-          row[dimCount] = anInt;
-        }
-      }
-
-      // write complex dimensions here.
-      for (; dimCount < dimCnt; dimCount++) {
-        short aShort = stream.readShort();
-        byte[] col = new byte[aShort];
-        stream.readFully(col);
-        row[dimCount] = col;
-      }
-
-      long[] words = new long[nullSetWordsLength];
-      for (int i = 0; i < words.length; i++) {
-        words[i] = stream.readLong();
-      }
-
-      for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
-        if (UnsafeCarbonRowPage.isSet(words, mesCount)) {
-          DataType dataType = measureDataTypes[mesCount];
-          if (dataType == DataTypes.SHORT) {
-            row[dimCount + mesCount] = stream.readShort();
-          } else if (dataType == DataTypes.INT) {
-            row[dimCount + mesCount] = stream.readInt();
-          } else if (dataType == DataTypes.LONG) {
-            row[dimCount + mesCount] = stream.readLong();
-          } else if (dataType == DataTypes.DOUBLE) {
-            row[dimCount + mesCount] = stream.readDouble();
-          } else if (DataTypes.isDecimal(dataType)) {
-            short aShort = stream.readShort();
-            byte[] bigDecimalInBytes = new byte[aShort];
-            stream.readFully(bigDecimalInBytes);
-            row[dimCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
-          } else {
-            throw new IllegalArgumentException("unsupported data type:" + dataType);
-          }
-        }
-      }
-      return row;
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException(e);
+  private IntermediateSortTempRow[] readBatchedRowFromStream(int expected)
+      throws IOException {
+    IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
+    for (int i = 0; i < expected; i++) {
+      IntermediateSortTempRow holder
+          = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+      holders[i] = holder;
     }
+    this.numberOfObjectRead += expected;
+    return holders;
   }
 
   /**
@@ -271,7 +223,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
    *
    * @return row
    */
-  public Object[] getRow() {
+  public IntermediateSortTempRow getRow() {
     return this.returnRow;
   }
 
@@ -326,9 +278,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
 
   @Override public int hashCode() {
     int hash = 0;
-    hash += 31 * measureCnt;
-    hash += 31 * dimCnt;
-    hash += 31 * complexCnt;
+    hash += tableFieldStat.hashCode();
     hash += tempFile.hashCode();
     return hash;
   }
@@ -368,16 +318,12 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
   /**
    * This method will read the records from sort temp file and keep it in a buffer
    *
-   * @param numberOfRecords
-   * @return
-   * @throws CarbonSortKeyAndGroupByException
+   * @param numberOfRecords number of records to be read
+   * @return batch of intermediate sort temp row
+   * @throws IOException if error occurs reading records from file
    */
-  private Object[][] prefetchRecordsFromFile(int numberOfRecords)
-      throws CarbonSortKeyAndGroupByException {
-    Object[][] records = new Object[numberOfRecords][];
-    for (int i = 0; i < numberOfRecords; i++) {
-      records[i] = getRowFromStream();
-    }
-    return records;
+  private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords)
+      throws IOException {
+    return readBatchedRowFromStream(numberOfRecords);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index 4bbf61b..22673ff 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -21,25 +21,21 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
 import java.util.AbstractQueue;
-import java.util.Arrays;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeSortTempFileChunkHolder;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 public class UnsafeIntermediateFileMerger implements Callable<Void> {
   /**
@@ -69,22 +65,13 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
   private int totalNumberOfRecords;
 
   private SortParameters mergerParameters;
-
+  private TableFieldStat tableFieldStat;
   private File[] intermediateFiles;
-
   private File outPutFile;
 
-  private int dimCnt;
-  private int complexCnt;
-  private int measureCnt;
-  private boolean[] isNoDictionaryDimensionColumn;
-  private DataType[] measureDataTypes;
   private int writeBufferSize;
   private String compressorName;
-
-  private long[] nullSetWords;
-
-  private ByteBuffer rowData;
+  private SortStepRowHandler sortStepRowHandler;
 
   private Throwable throwable;
 
@@ -97,16 +84,10 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
     this.fileCounter = intermediateFiles.length;
     this.intermediateFiles = intermediateFiles;
     this.outPutFile = outPutFile;
-    this.dimCnt = mergerParameters.getDimColCount();
-    this.complexCnt = mergerParameters.getComplexDimColCount();
-    this.measureCnt = mergerParameters.getMeasureColCount();
-    this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn();
-    this.measureDataTypes = mergerParameters.getMeasureDataType();
     this.writeBufferSize = mergerParameters.getBufferSize();
     this.compressorName = mergerParameters.getSortTempCompressorName();
-    this.nullSetWords = new long[((measureCnt - 1) >> 6) + 1];
-    // Take size of 2 MB for each row. I think it is high enough to use
-    rowData = ByteBuffer.allocate(2 * 1024 * 1024);
+    this.tableFieldStat = new TableFieldStat(mergerParameters);
+    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
   }
 
   @Override public Void call() throws Exception {
@@ -165,13 +146,14 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
   }
 
   /**
-   * This method will be used to get the sorted record from file
+   * This method will be used to get sorted sort temp row from the sort temp files
    *
    * @return sorted record sorted record
    * @throws CarbonSortKeyAndGroupByException
    */
-  private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
-    Object[] row = null;
+  private IntermediateSortTempRow getSortedRecordFromFile()
+      throws CarbonSortKeyAndGroupByException {
+    IntermediateSortTempRow row = null;
 
     // poll the top object from heap
     // heap maintains binary tree which is based on heap condition that will
@@ -235,7 +217,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
       this.recordHolderHeap.add(sortTempFileChunkHolder);
     }
 
-    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
+    LOGGER.info("Heap Size: " + this.recordHolderHeap.size());
   }
 
   /**
@@ -250,12 +232,12 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
   }
 
   /**
-   * This method will be used to get the sorted row
+   * This method will be used to get the sorted sort temp row
    *
    * @return sorted row
    * @throws CarbonSortKeyAndGroupByException
    */
-  private Object[] next() throws CarbonSortKeyAndGroupByException {
+  private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
     return getSortedRecordFromFile();
   }
 
@@ -272,82 +254,16 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
   /**
    * Below method will be used to write data to file
    *
-   * @throws CarbonSortKeyAndGroupByException problem while writing
+   * @throws IOException problem while writing
    */
-  private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException, IOException {
-    int dimCount = 0;
-    int size = 0;
-    for (; dimCount < isNoDictionaryDimensionColumn.length; dimCount++) {
-      if (isNoDictionaryDimensionColumn[dimCount]) {
-        byte[] col = (byte[]) row[dimCount];
-        rowData.putShort((short) col.length);
-        size += 2;
-        rowData.put(col);
-        size += col.length;
-      } else {
-        rowData.putInt((int) row[dimCount]);
-        size += 4;
-      }
-    }
-
-    // write complex dimensions here.
-    int dimensionSize = dimCnt + complexCnt;
-    for (; dimCount < dimensionSize; dimCount++) {
-      byte[] col = (byte[]) row[dimCount];
-      rowData.putShort((short)col.length);
-      size += 2;
-      rowData.put(col);
-      size += col.length;
-    }
-    Arrays.fill(nullSetWords, 0);
-    int nullSetSize = nullSetWords.length * 8;
-    int nullLoc = size;
-    size += nullSetSize;
-    for (int mesCount = 0; mesCount < measureCnt; mesCount++) {
-      Object value = row[mesCount + dimensionSize];
-      if (null != value) {
-        DataType dataType = measureDataTypes[mesCount];
-        if (dataType == DataTypes.SHORT) {
-          rowData.putShort(size, (Short) value);
-          size += 2;
-        } else if (dataType == DataTypes.INT) {
-          rowData.putInt(size, (Integer) value);
-          size += 4;
-        } else if (dataType == DataTypes.LONG) {
-          rowData.putLong(size, (Long) value);
-          size += 8;
-        } else if (dataType == DataTypes.DOUBLE) {
-          rowData.putDouble(size, (Double) value);
-          size += 8;
-        } else if (DataTypes.isDecimal(dataType)) {
-          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(((BigDecimal) value));
-          rowData.putShort(size, (short) bigDecimalInBytes.length);
-          size += 2;
-          for (int i = 0; i < bigDecimalInBytes.length; i++) {
-            rowData.put(size++, bigDecimalInBytes[i]);
-          }
-        }
-        UnsafeCarbonRowPage.set(nullSetWords, mesCount);
-      } else {
-        UnsafeCarbonRowPage.unset(nullSetWords, mesCount);
-      }
-    }
-    for (int i = 0; i < nullSetWords.length; i++) {
-      rowData.putLong(nullLoc, nullSetWords[i]);
-      nullLoc += 8;
-    }
-    byte[] rowBytes = new byte[size];
-    rowData.position(0);
-    rowData.get(rowBytes);
-    stream.write(rowBytes);
-    rowData.clear();
+  private void writeDataToFile(IntermediateSortTempRow row) throws IOException {
+    sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream);
   }
 
   private void finish() throws CarbonSortKeyAndGroupByException {
     clear();
     try {
       CarbonUtil.deleteFiles(intermediateFiles);
-      rowData.clear();
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index ce118d9..64f3c25 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -29,7 +29,8 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.loading.sort.SortStepRowUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.SortTempChunkHolder;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeFinalMergePageHolder;
@@ -55,7 +56,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   private AbstractQueue<SortTempChunkHolder> recordHolderHeapLocal;
 
   private SortParameters parameters;
-  private SortStepRowUtil sortStepRowUtil;
+  private SortStepRowHandler sortStepRowHandler;
   /**
    * tempFileLocation
    */
@@ -68,7 +69,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters,
       String[] tempFileLocation) {
     this.parameters = parameters;
-    this.sortStepRowUtil = new SortStepRowUtil(parameters);
+    this.sortStepRowHandler = new SortStepRowHandler(parameters);
     this.tempFileLocation = tempFileLocation;
     this.tableName = parameters.getTableName();
   }
@@ -108,9 +109,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
       LOGGER.info("Started adding first record from each page");
       for (final UnsafeCarbonRowPage rowPage : rowPages) {
 
-        SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage,
-            parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
-                .getMeasureColCount(), parameters.getNumberOfSortColumns());
+        SortTempChunkHolder sortTempFileChunkHolder = new UnsafeInmemoryHolder(rowPage);
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -121,9 +120,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
       for (final UnsafeInMemoryIntermediateDataMerger merger : merges) {
 
         SortTempChunkHolder sortTempFileChunkHolder =
-            new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn(),
-                parameters.getDimColCount() + parameters.getComplexDimColCount() + parameters
-                    .getMeasureColCount());
+            new UnsafeFinalMergePageHolder(merger, parameters.getNoDictionarySortColumn());
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -142,7 +139,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
         recordHolderHeapLocal.add(sortTempFileChunkHolder);
       }
 
-      LOGGER.info("Heap Size" + this.recordHolderHeapLocal.size());
+      LOGGER.info("Heap Size: " + this.recordHolderHeapLocal.size());
     } catch (Exception e) {
       LOGGER.error(e);
       throw new CarbonDataWriterException(e);
@@ -180,12 +177,14 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
   }
 
   /**
-   * This method will be used to get the sorted row
+   * This method will be used to get the sorted row in 3-parted format.
+   * The row will feed the following writer process step.
    *
    * @return sorted row
    */
   public Object[] next() {
-    return sortStepRowUtil.convertRow(getSortedRecordFromFile());
+    IntermediateSortTempRow sortTempRow =  getSortedRecordFromFile();
+    return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
   }
 
   /**
@@ -193,8 +192,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
    *
    * @return sorted record sorted record
    */
-  private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
-    Object[] row = null;
+  private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException {
+    IntermediateSortTempRow row = null;
 
     // poll the top object from heap
     // heap maintains binary tree which is based on heap condition that will

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 8fc6e66..bd1d0a4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -390,7 +390,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
           noDictionarySortColumnMapping, 0, noDictionarySortColumnMapping.length);
     }
     sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
-
     String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
         CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     finalMerger =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index 04efa1f..c06819c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -21,7 +21,6 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.util.AbstractQueue;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
@@ -29,11 +28,9 @@ import java.util.concurrent.Callable;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
 public class IntermediateFileMerger implements Callable<Void> {
@@ -68,17 +65,12 @@ public class IntermediateFileMerger implements Callable<Void> {
   private File[] intermediateFiles;
 
   private File outPutFile;
-  private int dimCnt;
-  private int noDictDimCnt;
-  private int complexCnt;
-  private int measureCnt;
-  private boolean[] isNoDictionaryDimensionColumn;
-  private DataType[] measureDataTypes;
   private int writeBufferSize;
   private String compressorName;
 
   private Throwable throwable;
-
+  private TableFieldStat tableFieldStat;
+  private SortStepRowHandler sortStepRowHandler;
   /**
    * IntermediateFileMerger Constructor
    */
@@ -88,14 +80,10 @@ public class IntermediateFileMerger implements Callable<Void> {
     this.fileCounter = intermediateFiles.length;
     this.intermediateFiles = intermediateFiles;
     this.outPutFile = outPutFile;
-    this.dimCnt = mergerParameters.getDimColCount();
-    this.noDictDimCnt = mergerParameters.getNoDictionaryCount();
-    this.complexCnt = mergerParameters.getComplexDimColCount();
-    this.measureCnt = mergerParameters.getMeasureColCount();
-    this.isNoDictionaryDimensionColumn = mergerParameters.getNoDictionaryDimnesionColumn();
-    this.measureDataTypes = mergerParameters.getMeasureDataType();
     this.writeBufferSize = mergerParameters.getBufferSize();
     this.compressorName = mergerParameters.getSortTempCompressorName();
+    this.tableFieldStat = new TableFieldStat(mergerParameters);
+    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
   }
 
   @Override public Void call() throws Exception {
@@ -154,13 +142,14 @@ public class IntermediateFileMerger implements Callable<Void> {
   }
 
   /**
-   * This method will be used to get the sorted record from file
+   * This method will be used to get the sorted sort temp row from sort temp file
    *
    * @return sorted record sorted record
    * @throws CarbonSortKeyAndGroupByException
    */
-  private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
-    Object[] row = null;
+  private IntermediateSortTempRow getSortedRecordFromFile()
+      throws CarbonSortKeyAndGroupByException {
+    IntermediateSortTempRow row = null;
 
     // poll the top object from heap
     // heap maintains binary tree which is based on heap condition that will
@@ -227,7 +216,7 @@ public class IntermediateFileMerger implements Callable<Void> {
       this.recordHolderHeap.add(sortTempFileChunkHolder);
     }
 
-    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
+    LOGGER.info("Heap Size: " + this.recordHolderHeap.size());
   }
 
   /**
@@ -242,12 +231,12 @@ public class IntermediateFileMerger implements Callable<Void> {
   }
 
   /**
-   * This method will be used to get the sorted row
+   * This method will be used to get the sorted sort temp row
    *
    * @return sorted row
    * @throws CarbonSortKeyAndGroupByException
    */
-  private Object[] next() throws CarbonSortKeyAndGroupByException {
+  private IntermediateSortTempRow next() throws CarbonSortKeyAndGroupByException {
     return getSortedRecordFromFile();
   }
 
@@ -264,62 +253,10 @@ public class IntermediateFileMerger implements Callable<Void> {
   /**
    * Below method will be used to write data to file
    *
-   * @throws CarbonSortKeyAndGroupByException problem while writing
+   * @throws IOException problem while writing
    */
-  private void writeDataToFile(Object[] row) throws CarbonSortKeyAndGroupByException {
-    try {
-      int[] mdkArray = (int[]) row[0];
-      byte[][] nonDictArray = (byte[][]) row[1];
-      int mdkIndex = 0;
-      int nonDictKeyIndex = 0;
-      // write dictionary and non dictionary dimensions here.
-      for (boolean nodictinary : isNoDictionaryDimensionColumn) {
-        if (nodictinary) {
-          byte[] col = nonDictArray[nonDictKeyIndex++];
-          stream.writeShort(col.length);
-          stream.write(col);
-        } else {
-          stream.writeInt(mdkArray[mdkIndex++]);
-        }
-      }
-      // write complex
-      for (; nonDictKeyIndex < noDictDimCnt + complexCnt; nonDictKeyIndex++) {
-        byte[] col = nonDictArray[nonDictKeyIndex++];
-        stream.writeShort(col.length);
-        stream.write(col);
-      }
-      // write measure
-      int fieldIndex = 0;
-      for (int counter = 0; counter < measureCnt; counter++) {
-        if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
-          stream.write((byte) 1);
-          DataType dataType = measureDataTypes[counter];
-          if (dataType == DataTypes.BOOLEAN) {
-            stream.writeBoolean((boolean)NonDictionaryUtil.getMeasure(fieldIndex, row));
-          } else if (dataType == DataTypes.SHORT) {
-            stream.writeShort((short) NonDictionaryUtil.getMeasure(fieldIndex, row));
-          } else if (dataType == DataTypes.INT) {
-            stream.writeInt((int) NonDictionaryUtil.getMeasure(fieldIndex, row));
-          } else if (dataType == DataTypes.LONG) {
-            stream.writeLong((long) NonDictionaryUtil.getMeasure(fieldIndex, row));
-          } else if (dataType == DataTypes.DOUBLE) {
-            stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
-          } else if (DataTypes.isDecimal(dataType)) {
-            byte[] bigDecimalInBytes = DataTypeUtil
-                .bigDecimalToByte((BigDecimal) NonDictionaryUtil.getMeasure(fieldIndex, row));
-            stream.writeInt(bigDecimalInBytes.length);
-            stream.write(bigDecimalInBytes);
-          } else {
-            throw new IllegalArgumentException("unsupported data type:" + dataType);
-          }
-        } else {
-          stream.write((byte) 0);
-        }
-        fieldIndex++;
-      }
-    } catch (IOException e) {
-      throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
-    }
+  private void writeDataToFile(IntermediateSortTempRow row) throws IOException {
+    sortStepRowHandler.writeIntermediateSortTempRowToOutputStream(row, stream);
   }
 
   private void finish() throws CarbonSortKeyAndGroupByException {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
new file mode 100644
index 0000000..9b6d1e8
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateSortTempRowComparator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+
+/**
+ * This class is used as comparator for comparing intermediate sort temp row
+ */
+public class IntermediateSortTempRowComparator implements Comparator<IntermediateSortTempRow> {
+  /**
+   * isSortColumnNoDictionary whether the sort column is not dictionary or not
+   */
+  private boolean[] isSortColumnNoDictionary;
+
+  /**
+   * @param isSortColumnNoDictionary isSortColumnNoDictionary
+   */
+  public IntermediateSortTempRowComparator(boolean[] isSortColumnNoDictionary) {
+    this.isSortColumnNoDictionary = isSortColumnNoDictionary;
+  }
+
+  /**
+   * Below method will be used to compare two sort temp row
+   */
+  public int compare(IntermediateSortTempRow rowA, IntermediateSortTempRow rowB) {
+    int diff = 0;
+    int dictIndex = 0;
+    int nonDictIndex = 0;
+
+    for (boolean isNoDictionary : isSortColumnNoDictionary) {
+
+      if (isNoDictionary) {
+        byte[] byteArr1 = rowA.getNoDictSortDims()[nonDictIndex];
+        byte[] byteArr2 = rowB.getNoDictSortDims()[nonDictIndex];
+        nonDictIndex++;
+
+        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+        if (difference != 0) {
+          return difference;
+        }
+      } else {
+        int dimFieldA = rowA.getDictSortDims()[dictIndex];
+        int dimFieldB = rowB.getDictSortDims()[dictIndex];
+        dictIndex++;
+
+        diff = dimFieldA - dimFieldB;
+        if (diff != 0) {
+          return diff;
+        }
+      }
+    }
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
index d2579d2..3f94533 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -40,14 +40,11 @@ public class NewRowComparator implements Comparator<Object[]> {
    */
   public int compare(Object[] rowA, Object[] rowB) {
     int diff = 0;
-
     int index = 0;
 
     for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-
       if (isNoDictionary) {
         byte[] byteArr1 = (byte[]) rowA[index];
-
         byte[] byteArr2 = (byte[]) rowB[index];
 
         int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
@@ -57,6 +54,7 @@ public class NewRowComparator implements Comparator<Object[]> {
       } else {
         int dimFieldA = (int) rowA[index];
         int dimFieldB = (int) rowB[index];
+
         diff = dimFieldA - dimFieldB;
         if (diff != 0) {
           return diff;
@@ -65,7 +63,6 @@ public class NewRowComparator implements Comparator<Object[]> {
 
       index++;
     }
-
     return diff;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
index e01b587..7538c92 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
@@ -29,7 +29,7 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
   private int numberOfSortColumns;
 
   /**
-   * RowComparatorForNormalDims Constructor
+   * NewRowComparatorForNormalDims Constructor
    *
    * @param numberOfSortColumns
    */
@@ -46,7 +46,6 @@ public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
     int diff = 0;
 
     for (int i = 0; i < numberOfSortColumns; i++) {
-
       int dimFieldA = (int)rowA[i];
       int dimFieldB = (int)rowB[i];
       diff = dimFieldA - dimFieldB;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
deleted file mode 100644
index 0ae0b93..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.nio.ByteBuffer;
-import java.util.Comparator;
-
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-
-public class RowComparator implements Comparator<Object[]> {
-  /**
-   * noDictionaryCount represent number of no dictionary cols
-   */
-  private int noDictionaryCount;
-
-  /**
-   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
-   */
-  private boolean[] noDictionarySortColumnMaping;
-
-  /**
-   * @param noDictionarySortColumnMaping
-   * @param noDictionaryCount
-   */
-  public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) {
-    this.noDictionaryCount = noDictionaryCount;
-    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
-  }
-
-  /**
-   * Below method will be used to compare two mdkey
-   */
-  public int compare(Object[] rowA, Object[] rowB) {
-    int diff = 0;
-
-    int normalIndex = 0;
-    int noDictionaryindex = 0;
-
-    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-
-      if (isNoDictionary) {
-        byte[] byteArr1 = (byte[]) rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-
-        ByteBuffer buff1 = ByteBuffer.wrap(byteArr1);
-
-        // extract a high card dims from complete byte[].
-        NonDictionaryUtil
-            .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1);
-
-        byte[] byteArr2 = (byte[]) rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-
-        ByteBuffer buff2 = ByteBuffer.wrap(byteArr2);
-
-        // extract a high card dims from complete byte[].
-        NonDictionaryUtil
-            .extractSingleHighCardDims(byteArr2, noDictionaryindex, noDictionaryCount, buff2);
-
-        int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2);
-        if (difference != 0) {
-          return difference;
-        }
-        noDictionaryindex++;
-      } else {
-        int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, rowA);
-        int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, rowB);
-        diff = dimFieldA - dimFieldB;
-        if (diff != 0) {
-          return diff;
-        }
-        normalIndex++;
-      }
-
-    }
-
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
deleted file mode 100644
index 0883ae1..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparatorForNormalDims.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.sort.sortdata;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-
-/**
- * This class is used as comparator for comparing dims which are non high cardinality dims.
- * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
- */
-public class RowComparatorForNormalDims implements Comparator<Object[]> {
-  /**
-   * dimension count
-   */
-  private int numberOfSortColumns;
-
-  /**
-   * RowComparatorForNormalDims Constructor
-   *
-   * @param numberOfSortColumns
-   */
-  public RowComparatorForNormalDims(int numberOfSortColumns) {
-    this.numberOfSortColumns = numberOfSortColumns;
-  }
-
-  /**
-   * Below method will be used to compare two surrogate keys
-   *
-   * @see Comparator#compare(Object, Object)
-   */
-  public int compare(Object[] rowA, Object[] rowB) {
-    int diff = 0;
-
-    for (int i = 0; i < numberOfSortColumns; i++) {
-
-      int dimFieldA = NonDictionaryUtil.getDimension(i, rowA);
-      int dimFieldB = NonDictionaryUtil.getDimension(i, rowB);
-
-      diff = dimFieldA - dimFieldB;
-      if (diff != 0) {
-        return diff;
-      }
-    }
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index 88695b9..a4ac0ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -37,6 +37,8 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -71,12 +73,12 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    * tableName
    */
   private String tableName;
-
+  private SortParameters sortParameters;
+  private SortStepRowHandler sortStepRowHandler;
   /**
    * tempFileLocation
    */
   private String[] tempFileLocation;
-  private SortParameters sortParameters;
 
   private int maxThreadForSorting;
 
@@ -89,6 +91,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
     this.tempFileLocation = tempFileLocation;
     this.tableName = tableName;
     this.sortParameters = sortParameters;
+    this.sortStepRowHandler = new SortStepRowHandler(sortParameters);
     try {
       maxThreadForSorting = Integer.parseInt(CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.CARBON_MERGE_SORT_READER_THREAD,
@@ -107,8 +110,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    */
   public void startFinalMerge() throws CarbonDataWriterException {
     List<File> filesToMerge = getFilesToMergeSort();
-    if (filesToMerge.size() == 0)
-    {
+    if (filesToMerge.size() == 0) {
       LOGGER.info("No file to merge in final merge stage");
       return;
     }
@@ -125,11 +127,9 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
 
     // get all the merged files
     List<File> files = new ArrayList<File>(tempFileLocation.length);
-    for (String tempLoc : tempFileLocation)
-    {
+    for (String tempLoc : tempFileLocation) {
       File[] subFiles = new File(tempLoc).listFiles(fileFilter);
-      if (null != subFiles && subFiles.length > 0)
-      {
+      if (null != subFiles && subFiles.length > 0) {
         files.addAll(Arrays.asList(subFiles));
       }
     }
@@ -226,13 +226,14 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
   }
 
   /**
-   * This method will be used to get the sorted row
+   * This method will be used to get the sorted sort temp row from the sort temp files
    *
    * @return sorted row
    * @throws CarbonSortKeyAndGroupByException
    */
   public Object[] next() {
-    return getSortedRecordFromFile();
+    IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
+    return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
   }
 
   /**
@@ -241,8 +242,8 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
    * @return sorted record sorted record
    * @throws CarbonSortKeyAndGroupByException
    */
-  private Object[] getSortedRecordFromFile() throws CarbonDataWriterException {
-    Object[] row = null;
+  private IntermediateSortTempRow getSortedRecordFromFile() throws CarbonDataWriterException {
+    IntermediateSortTempRow row = null;
 
     // poll the top object from heap
     // heap maintains binary tree which is based on heap condition that will

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
index 57a19bd..c7efbd9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortDataRows.java
@@ -20,7 +20,7 @@ package org.apache.carbondata.processing.sort.sortdata;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -32,12 +32,10 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -69,7 +67,8 @@ public class SortDataRows {
   private Semaphore semaphore;
 
   private SortParameters parameters;
-
+  private SortStepRowHandler sortStepRowHandler;
+  private ThreadLocal<ByteBuffer> rowBuffer;
   private int sortBufferSize;
 
   private SortIntermediateFileMerger intermediateFileMerger;
@@ -79,7 +78,7 @@ public class SortDataRows {
   public SortDataRows(SortParameters parameters,
       SortIntermediateFileMerger intermediateFileMerger) {
     this.parameters = parameters;
-
+    this.sortStepRowHandler = new SortStepRowHandler(parameters);
     this.intermediateFileMerger = intermediateFileMerger;
 
     int batchSize = CarbonProperties.getInstance().getBatchSize();
@@ -87,6 +86,12 @@ public class SortDataRows {
     this.sortBufferSize = Math.max(parameters.getSortBufferSize(), batchSize);
     // observer of writing file in thread
     this.threadStatusObserver = new ThreadStatusObserver();
+    this.rowBuffer = new ThreadLocal<ByteBuffer>() {
+      @Override protected ByteBuffer initialValue() {
+        byte[] backedArray = new byte[2 * 1024 * 1024];
+        return ByteBuffer.wrap(backedArray);
+      }
+    };
   }
 
   /**
@@ -130,8 +135,7 @@ public class SortDataRows {
         semaphore.acquire();
         dataSorterAndWriterExecutorService.execute(new DataSorterAndWriter(recordHolderListLocal));
       } catch (InterruptedException e) {
-        LOGGER.error(e,
-            "exception occurred while trying to acquire a semaphore lock: ");
+        LOGGER.error(e, "exception occurred while trying to acquire a semaphore lock: ");
         throw new CarbonSortKeyAndGroupByException(e);
       }
       // create the new holder Array
@@ -158,7 +162,7 @@ public class SortDataRows {
         }
         intermediateFileMerger.startMergingIfPossible();
         Object[][] recordHolderListLocal = recordHolderList;
-        sizeLeft = sortBufferSize - entryCount ;
+        sizeLeft = sortBufferSize - entryCount;
         if (sizeLeft > 0) {
           System.arraycopy(rowBatch, 0, recordHolderListLocal, entryCount, sizeLeft);
         }
@@ -212,7 +216,6 @@ public class SortDataRows {
           locationChosen + File.separator + parameters.getTableName() +
               System.nanoTime() + CarbonCommonConstants.SORT_TEMP_FILE_EXT);
       writeDataToFile(recordHolderList, this.entryCount, file);
-
     }
 
     startFileBasedMerge();
@@ -220,7 +223,7 @@ public class SortDataRows {
   }
 
   /**
-   * Below method will be used to write data to file
+   * Below method will be used to write data to sort temp file
    *
    * @throws CarbonSortKeyAndGroupByException problem while writing
    */
@@ -233,60 +236,9 @@ public class SortDataRows {
           parameters.getFileWriteBufferSize(), parameters.getSortTempCompressorName());
       // write number of entries to the file
       stream.writeInt(entryCountLocal);
-      int complexDimColCount = parameters.getComplexDimColCount();
-      int dimColCount = parameters.getDimColCount() + complexDimColCount;
-      DataType[] type = parameters.getMeasureDataType();
-      boolean[] noDictionaryDimnesionMapping = parameters.getNoDictionaryDimnesionColumn();
-      Object[] row = null;
       for (int i = 0; i < entryCountLocal; i++) {
-        // get row from record holder list
-        row = recordHolderList[i];
-        int dimCount = 0;
-        // write dictionary and non dictionary dimensions here.
-        for (; dimCount < noDictionaryDimnesionMapping.length; dimCount++) {
-          if (noDictionaryDimnesionMapping[dimCount]) {
-            byte[] col = (byte[]) row[dimCount];
-            stream.writeShort(col.length);
-            stream.write(col);
-          } else {
-            stream.writeInt((int)row[dimCount]);
-          }
-        }
-        // write complex dimensions here.
-        for (; dimCount < dimColCount; dimCount++) {
-          byte[] value = (byte[])row[dimCount];
-          stream.writeShort(value.length);
-          stream.write(value);
-        }
-        // as measures are stored in separate array.
-        for (int mesCount = 0;
-             mesCount < parameters.getMeasureColCount(); mesCount++) {
-          Object value = row[mesCount + dimColCount];
-          if (null != value) {
-            stream.write((byte) 1);
-            DataType dataType = type[mesCount];
-            if (dataType == DataTypes.BOOLEAN) {
-              stream.writeBoolean((boolean) value);
-            } else if (dataType == DataTypes.SHORT) {
-              stream.writeShort((Short) value);
-            } else if (dataType == DataTypes.INT) {
-              stream.writeInt((Integer) value);
-            } else if (dataType == DataTypes.LONG) {
-              stream.writeLong((Long) value);
-            } else if (dataType == DataTypes.DOUBLE) {
-              stream.writeDouble((Double) value);
-            } else if (DataTypes.isDecimal(dataType)) {
-              BigDecimal val = (BigDecimal) value;
-              byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val);
-              stream.writeInt(bigDecimalInBytes.length);
-              stream.write(bigDecimalInBytes);
-            } else {
-              throw new IllegalArgumentException("unsupported data type:" + type[mesCount]);
-            }
-          } else {
-            stream.write((byte) 0);
-          }
-        }
+        sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToOutputStream(
+            recordHolderList[i], stream, rowBuffer.get());
       }
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
@@ -301,7 +253,7 @@ public class SortDataRows {
    *
    * @throws CarbonSortKeyAndGroupByException
    */
-  public void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
+  private void deleteSortLocationIfExists() throws CarbonSortKeyAndGroupByException {
     CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation());
   }
 
@@ -380,7 +332,8 @@ public class SortDataRows {
         // intermediate merging of sort temp files will be triggered
         intermediateFileMerger.addFileToMerge(sortTempFile);
         LOGGER.info("Time taken to sort and write sort temp file " + sortTempFile + " is: " + (
-            System.currentTimeMillis() - startTime));
+            System.currentTimeMillis() - startTime) + ", sort temp file size in MB is "
+            + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
       } catch (Throwable e) {
         try {
           threadStatusObserver.notifyFailed(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index d726539..7e221a7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -30,14 +31,11 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
 public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> {
@@ -71,20 +69,13 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   /**
    * return row
    */
-  private Object[] returnRow;
-  private int dimCnt;
-  private int noDictDimCnt;
-  private int complexCnt;
-  private int measureCnt;
-  private boolean[] isNoDictionaryDimensionColumn;
-  private boolean[] isNoDictionarySortColumn;
-  private DataType[] measureDataTypes;
+  private IntermediateSortTempRow returnRow;
   private int readBufferSize;
   private String compressorName;
 
-  private Object[][] currentBuffer;
+  private IntermediateSortTempRow[] currentBuffer;
 
-  private Object[][] backupBuffer;
+  private IntermediateSortTempRow[] backupBuffer;
 
   private boolean isBackupFilled;
 
@@ -104,7 +95,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    * totalRecordFetch
    */
   private int totalRecordFetch;
-
+  private TableFieldStat tableFieldStat;
+  private SortStepRowHandler sortStepRowHandler;
+  private Comparator<IntermediateSortTempRow> comparator;
   /**
    * Constructor to initialize
    *
@@ -115,16 +108,12 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName) {
     // set temp file
     this.tempFile = tempFile;
-    this.dimCnt = sortParameters.getDimColCount();
-    this.noDictDimCnt = sortParameters.getNoDictionaryCount();
-    this.complexCnt = sortParameters.getComplexDimColCount();
-    this.measureCnt = sortParameters.getMeasureColCount();
-    this.isNoDictionaryDimensionColumn = sortParameters.getNoDictionaryDimnesionColumn();
-    this.isNoDictionarySortColumn = sortParameters.getNoDictionarySortColumn();
-    this.measureDataTypes = sortParameters.getMeasureDataType();
     this.readBufferSize = sortParameters.getBufferSize();
     this.compressorName = sortParameters.getSortTempCompressorName();
-
+    this.tableFieldStat = new TableFieldStat(sortParameters);
+    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
+    this.comparator = new IntermediateSortTempRowComparator(
+        tableFieldStat.getIsSortColNoDictFlags());
     this.executorService = Executors
         .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
   }
@@ -178,7 +167,12 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     if (prefetch) {
       fillDataForPrefetch();
     } else {
-      this.returnRow = getRowFromStream();
+      try {
+        this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+        this.numberOfObjectRead++;
+      } catch (IOException e) {
+        throw new CarbonSortKeyAndGroupByException("Problem while reading rows", e);
+      }
     }
   }
 
@@ -212,86 +206,28 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   }
 
   /**
-   * Reads row from file
+   * Read a batch of row from stream
+   *
    * @return Object[]
-   * @throws CarbonSortKeyAndGroupByException
+   * @throws IOException if error occurs while reading from stream
    */
-  private Object[] getRowFromStream() throws CarbonSortKeyAndGroupByException {
-    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
-
-    Object[] holder = new Object[3];
-    int index = 0;
-    int nonDicIndex = 0;
-    int[] dim = new int[dimCnt - noDictDimCnt];
-    byte[][] nonDicArray = new byte[noDictDimCnt + complexCnt][];
-    Object[] measures = new Object[measureCnt];
-    try {
-      // read dimension values
-      for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
-        if (isNoDictionaryDimensionColumn[i]) {
-          short len = stream.readShort();
-          byte[] array = new byte[len];
-          stream.readFully(array);
-          nonDicArray[nonDicIndex++] = array;
-        } else {
-          dim[index++] = stream.readInt();
-        }
-      }
-
-      for (int i = 0; i < complexCnt; i++) {
-        short len = stream.readShort();
-        byte[] array = new byte[len];
-        stream.readFully(array);
-        nonDicArray[nonDicIndex++] = array;
-      }
-
-      index = 0;
-      // read measure values
-      for (int i = 0; i < measureCnt; i++) {
-        if (stream.readByte() == 1) {
-          DataType dataType = measureDataTypes[i];
-          if (dataType == DataTypes.BOOLEAN) {
-            measures[index++] = stream.readBoolean();
-          } else if (dataType == DataTypes.SHORT) {
-            measures[index++] = stream.readShort();
-          } else if (dataType == DataTypes.INT) {
-            measures[index++] = stream.readInt();
-          } else if (dataType == DataTypes.LONG) {
-            measures[index++] = stream.readLong();
-          } else if (dataType == DataTypes.DOUBLE) {
-            measures[index++] = stream.readDouble();
-          } else if (DataTypes.isDecimal(dataType)) {
-            int len = stream.readInt();
-            byte[] buff = new byte[len];
-            stream.readFully(buff);
-            measures[index++] = DataTypeUtil.byteToBigDecimal(buff);
-          } else {
-            throw new IllegalArgumentException("unsupported data type:" + dataType);
-          }
-        } else {
-          measures[index++] = null;
-        }
-      }
-
-      NonDictionaryUtil.prepareOutObj(holder, dim, nonDicArray, measures);
-
-      // increment number if record read
-      this.numberOfObjectRead++;
-    } catch (IOException e) {
-      LOGGER.error("Problme while reading the madkey fom sort temp file");
-      throw new CarbonSortKeyAndGroupByException("Problem while reading the sort temp file ", e);
+  private IntermediateSortTempRow[] readBatchedRowFromStream(int expected) throws IOException {
+    IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
+    for (int i = 0; i < expected; i++) {
+      IntermediateSortTempRow holder
+          = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+      holders[i] = holder;
     }
-
-    //return out row
-    return holder;
+    this.numberOfObjectRead += expected;
+    return holders;
   }
 
   /**
-   * below method will be used to get the row
+   * below method will be used to get the sort temp row
    *
    * @return row
    */
-  public Object[] getRow() {
+  public IntermediateSortTempRow getRow() {
     return this.returnRow;
   }
 
@@ -330,31 +266,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   }
 
   @Override public int compareTo(SortTempFileChunkHolder other) {
-    int diff = 0;
-    int index = 0;
-    int noDictionaryIndex = 0;
-    int[] leftMdkArray = (int[]) returnRow[0];
-    int[] rightMdkArray = (int[]) other.returnRow[0];
-    byte[][] leftNonDictArray = (byte[][]) returnRow[1];
-    byte[][] rightNonDictArray = (byte[][]) other.returnRow[1];
-    for (boolean isNoDictionary : isNoDictionarySortColumn) {
-      if (isNoDictionary) {
-        diff = UnsafeComparer.INSTANCE
-            .compareTo(leftNonDictArray[noDictionaryIndex], rightNonDictArray[noDictionaryIndex]);
-        if (diff != 0) {
-          return diff;
-        }
-        noDictionaryIndex++;
-      } else {
-        diff = leftMdkArray[index] - rightMdkArray[index];
-        if (diff != 0) {
-          return diff;
-        }
-        index++;
-      }
-
-    }
-    return diff;
+    return comparator.compare(returnRow, other.getRow());
   }
 
   @Override public boolean equals(Object obj) {
@@ -372,9 +284,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
 
   @Override public int hashCode() {
     int hash = 0;
-    hash += 31 * measureCnt;
-    hash += 31 * dimCnt;
-    hash += 31 * complexCnt;
+    hash += tableFieldStat.hashCode();
     hash += tempFile.hashCode();
     return hash;
   }
@@ -414,16 +324,12 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   /**
    * This method will read the records from sort temp file and keep it in a buffer
    *
-   * @param numberOfRecords
-   * @return
-   * @throws CarbonSortKeyAndGroupByException
+   * @param numberOfRecords number of records to be read
+   * @return batch of intermediate sort temp row
+   * @throws IOException if error occurs while reading reading records
    */
-  private Object[][] prefetchRecordsFromFile(int numberOfRecords)
-      throws CarbonSortKeyAndGroupByException {
-    Object[][] records = new Object[numberOfRecords][];
-    for (int i = 0; i < numberOfRecords; i++) {
-      records[i] = getRowFromStream();
-    }
-    return records;
+  private IntermediateSortTempRow[] prefetchRecordsFromFile(int numberOfRecords)
+      throws IOException {
+    return readBatchedRowFromStream(numberOfRecords);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
new file mode 100644
index 0000000..0d1303a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * This class is used to hold field information for a table during data loading. These information
+ * will be used to convert/construct/destruct row in sort process step. Because complex field is
+ * processed the same as no-dict-no-sort-simple-dimension, so we treat them as the same and use
+ * `no-dict-no-sort-dim` related variable to represent them in this class.
+ */
+public class TableFieldStat implements Serializable {
+  private static final long serialVersionUID = 201712070950L;
+  private int dictSortDimCnt = 0;
+  private int dictNoSortDimCnt = 0;
+  private int noDictSortDimCnt = 0;
+  private int noDictNoSortDimCnt = 0;
+  // whether sort column is of dictionary type or not
+  private boolean[] isSortColNoDictFlags;
+  private int measureCnt;
+  private DataType[] measureDataType;
+
+  // indices for dict & sort dimension columns
+  private int[] dictSortDimIdx;
+  // indices for dict & no-sort dimension columns
+  private int[] dictNoSortDimIdx;
+  // indices for no-dict & sort dimension columns
+  private int[] noDictSortDimIdx;
+  // indices for no-dict & no-sort dimension columns, including complex columns
+  private int[] noDictNoSortDimIdx;
+  // indices for measure columns
+  private int[] measureIdx;
+
+  public TableFieldStat(SortParameters sortParameters) {
+    int noDictDimCnt = sortParameters.getNoDictionaryCount();
+    int complexDimCnt = sortParameters.getComplexDimColCount();
+    int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
+    this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn();
+    int sortColCnt = isSortColNoDictFlags.length;
+    for (boolean flag : isSortColNoDictFlags) {
+      if (flag) {
+        noDictSortDimCnt++;
+      } else {
+        dictSortDimCnt++;
+      }
+    }
+    this.measureCnt = sortParameters.getMeasureColCount();
+    this.measureDataType = sortParameters.getMeasureDataType();
+
+    // be careful that the default value is 0
+    this.dictSortDimIdx = new int[dictSortDimCnt];
+    this.dictNoSortDimIdx = new int[dictDimCnt - dictSortDimCnt];
+    this.noDictSortDimIdx = new int[noDictSortDimCnt];
+    this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt];
+    this.measureIdx = new int[measureCnt];
+
+    int tmpNoDictSortCnt = 0;
+    int tmpNoDictNoSortCnt = 0;
+    int tmpDictSortCnt = 0;
+    int tmpDictNoSortCnt = 0;
+    boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn();
+
+    for (int i = 0; i < isDimNoDictFlags.length; i++) {
+      if (isDimNoDictFlags[i]) {
+        if (i < sortColCnt && isSortColNoDictFlags[i]) {
+          noDictSortDimIdx[tmpNoDictSortCnt++] = i;
+        } else {
+          noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
+        }
+      } else {
+        if (i < sortColCnt && !isSortColNoDictFlags[i]) {
+          dictSortDimIdx[tmpDictSortCnt++] = i;
+        } else {
+          dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
+        }
+      }
+    }
+    dictNoSortDimCnt = tmpDictNoSortCnt;
+
+    int base = isDimNoDictFlags.length;
+    // adding complex dimension columns
+    for (int i = 0; i < complexDimCnt; i++) {
+      noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = base + i;
+    }
+    noDictNoSortDimCnt = tmpNoDictNoSortCnt;
+
+    base += complexDimCnt;
+    // indices for measure columns
+    for (int i = 0; i < measureCnt; i++) {
+      measureIdx[i] = base + i;
+    }
+  }
+
+  public int getDictSortDimCnt() {
+    return dictSortDimCnt;
+  }
+
+  public int getDictNoSortDimCnt() {
+    return dictNoSortDimCnt;
+  }
+
+  public int getNoDictSortDimCnt() {
+    return noDictSortDimCnt;
+  }
+
+  public int getNoDictNoSortDimCnt() {
+    return noDictNoSortDimCnt;
+  }
+
+  public boolean[] getIsSortColNoDictFlags() {
+    return isSortColNoDictFlags;
+  }
+
+  public int getMeasureCnt() {
+    return measureCnt;
+  }
+
+  public DataType[] getMeasureDataType() {
+    return measureDataType;
+  }
+
+  public int[] getDictSortDimIdx() {
+    return dictSortDimIdx;
+  }
+
+  public int[] getDictNoSortDimIdx() {
+    return dictNoSortDimIdx;
+  }
+
+  public int[] getNoDictSortDimIdx() {
+    return noDictSortDimIdx;
+  }
+
+  public int[] getNoDictNoSortDimIdx() {
+    return noDictNoSortDimIdx;
+  }
+
+  public int[] getMeasureIdx() {
+    return measureIdx;
+  }
+
+  @Override public boolean equals(Object o) {
+    if (this == o) return true;
+    if (!(o instanceof TableFieldStat)) return false;
+    TableFieldStat that = (TableFieldStat) o;
+    return dictSortDimCnt == that.dictSortDimCnt
+        && dictNoSortDimCnt == that.dictNoSortDimCnt
+        && noDictSortDimCnt == that.noDictSortDimCnt
+        && noDictNoSortDimCnt == that.noDictNoSortDimCnt
+        && measureCnt == that.measureCnt;
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt,
+        noDictNoSortDimCnt, measureCnt);
+  }
+}
\ No newline at end of file


[2/2] carbondata git commit: [CARBONDATA-2018][DataLoad] Optimization in reading/writing for sort temp row

Posted by ja...@apache.org.
[CARBONDATA-2018][DataLoad] Optimization in reading/writing for sort temp row

Pick up the no-sort fields in the row and pack them as bytes array and skip parsing them during merge sort to reduce CPU consumption

This closes #1792


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/de92ea9a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/de92ea9a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/de92ea9a

Branch: refs/heads/carbonstore
Commit: de92ea9a123b17d903f2d1d4662299315c792954
Parents: cd7eed6
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Thu Feb 8 14:35:14 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Feb 9 01:01:22 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/util/NonDictionaryUtil.java |  67 +--
 .../presto/util/CarbonDataStoreCreator.scala    |   1 -
 .../load/DataLoadProcessorStepOnSpark.scala     |   6 +-
 .../loading/row/IntermediateSortTempRow.java    | 117 +++++
 .../loading/sort/SortStepRowHandler.java        | 466 +++++++++++++++++++
 .../loading/sort/SortStepRowUtil.java           | 103 ----
 .../sort/unsafe/UnsafeCarbonRowPage.java        | 331 ++-----------
 .../loading/sort/unsafe/UnsafeSortDataRows.java |  57 +--
 .../unsafe/comparator/UnsafeRowComparator.java  |  95 ++--
 .../UnsafeRowComparatorForNormalDIms.java       |  59 ---
 .../UnsafeRowComparatorForNormalDims.java       |  59 +++
 .../sort/unsafe/holder/SortTempChunkHolder.java |   3 +-
 .../holder/UnsafeFinalMergePageHolder.java      |  19 +-
 .../unsafe/holder/UnsafeInmemoryHolder.java     |  21 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   | 138 ++----
 .../merger/UnsafeIntermediateFileMerger.java    | 118 +----
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  27 +-
 .../merger/CompactionResultSortProcessor.java   |   1 -
 .../sort/sortdata/IntermediateFileMerger.java   |  95 +---
 .../IntermediateSortTempRowComparator.java      |  73 +++
 .../sort/sortdata/NewRowComparator.java         |   5 +-
 .../sortdata/NewRowComparatorForNormalDims.java |   3 +-
 .../processing/sort/sortdata/RowComparator.java |  94 ----
 .../sortdata/RowComparatorForNormalDims.java    |  62 ---
 .../SingleThreadFinalSortFilesMerger.java       |  25 +-
 .../processing/sort/sortdata/SortDataRows.java  |  85 +---
 .../sort/sortdata/SortTempFileChunkHolder.java  | 174 ++-----
 .../sort/sortdata/TableFieldStat.java           | 176 +++++++
 28 files changed, 1186 insertions(+), 1294 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
index d6ecfbc..fca1244 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
@@ -82,18 +82,26 @@ public class NonDictionaryUtil {
   }
 
   /**
-   * Method to get the required Dimension from obj []
+   * Method to get the required dictionary Dimension from obj []
    *
    * @param index
    * @param row
    * @return
    */
-  public static Integer getDimension(int index, Object[] row) {
-
-    Integer[] dimensions = (Integer[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];
-
+  public static int getDictDimension(int index, Object[] row) {
+    int[] dimensions = (int[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];
     return dimensions[index];
+  }
 
+  /**
+   * Method to get the required non-dictionary & complex from 3-parted row
+   * @param index
+   * @param row
+   * @return
+   */
+  public static byte[] getNoDictOrComplex(int index, Object[] row) {
+    byte[][] nonDictArray = (byte[][]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+    return nonDictArray[index];
   }
 
   /**
@@ -108,60 +116,11 @@ public class NonDictionaryUtil {
     return measures[index];
   }
 
-  public static byte[] getByteArrayForNoDictionaryCols(Object[] row) {
-
-    return (byte[]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-  }
-
   public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] byteBufferArr,
       Object[] measureArray) {
-
     out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray;
     out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = byteBufferArr;
     out[WriteStepRowUtil.MEASURE] = measureArray;
 
   }
-
-  /**
-   * This method will extract the single dimension from the complete high card dims byte[].+     *
-   * The format of the byte [] will be,  Totallength,CompleteStartOffsets,Dat
-   *
-   * @param highCardArr
-   * @param index
-   * @param highCardinalityCount
-   * @param outBuffer
-   */
-  public static void extractSingleHighCardDims(byte[] highCardArr, int index,
-      int highCardinalityCount, ByteBuffer outBuffer) {
-    ByteBuffer buff = null;
-    short secIndex = 0;
-    short firstIndex = 0;
-    int length;
-    // if the requested index is a last one then we need to calculate length
-    // based on byte[] length.
-    if (index == highCardinalityCount - 1) {
-      // need to read 2 bytes(1 short) to determine starting offset and
-      // length can be calculated by array length.
-      buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 2);
-    } else {
-      // need to read 4 bytes(2 short) to determine starting offset and
-      // length.
-      buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 4);
-    }
-
-    firstIndex = buff.getShort();
-    // if it is a last dimension in high card then this will be last
-    // offset.so calculate length from total length
-    if (index == highCardinalityCount - 1) {
-      secIndex = (short) highCardArr.length;
-    } else {
-      secIndex = buff.getShort();
-    }
-
-    length = secIndex - firstIndex;
-
-    outBuffer.position(firstIndex);
-    outBuffer.limit(outBuffer.position() + length);
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 1d7c791..7203278 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -383,7 +383,6 @@ object CarbonDataStoreCreator {
       .getInstance.createCache(CacheType.REVERSE_DICTIONARY)
 
     for (i <- set.indices) {
-      //      val dim = getDimension(dims, i).get
       val columnIdentifier: ColumnIdentifier =
         new ColumnIdentifier(dims.get(i).getColumnId, null, null)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 834c1a6..efa61c7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -35,7 +35,7 @@ import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
-import org.apache.carbondata.processing.loading.sort.SortStepRowUtil
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler
 import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
@@ -152,7 +152,7 @@ object DataLoadProcessorStepOnSpark {
     val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
     val conf = DataLoadProcessBuilder.createConfiguration(model)
     val sortParameters = SortParameters.createSortParameters(conf)
-    val sortStepRowUtil = new SortStepRowUtil(sortParameters)
+    val sortStepRowHandler = new SortStepRowHandler(sortParameters)
     TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
       wrapException(e, model)
     }
@@ -162,7 +162,7 @@ object DataLoadProcessorStepOnSpark {
 
       override def next(): CarbonRow = {
         val row =
-          new CarbonRow(sortStepRowUtil.convertRow(rows.next().getData))
+          new CarbonRow(sortStepRowHandler.convertRawRowTo3Parts(rows.next().getData))
         rowCounter.add(1)
         row
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
new file mode 100644
index 0000000..8d351cf
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.row;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/**
+ * During sort procedure, each row will be written to sort temp file in this logic format.
+ * an intermediate sort temp row consists 3 parts:
+ * dictSort, noDictSort, noSortDimsAndMeasures(dictNoSort, noDictNoSort, measure)
+ */
+public class IntermediateSortTempRow {
+  private int[] dictSortDims;
+  private byte[][] noDictSortDims;
+  private byte[] noSortDimsAndMeasures;
+
+  public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
+      byte[] noSortDimsAndMeasures) {
+    this.dictSortDims = dictSortDims;
+    this.noDictSortDims = noDictSortDims;
+    this.noSortDimsAndMeasures = noSortDimsAndMeasures;
+  }
+
+  public int[] getDictSortDims() {
+    return dictSortDims;
+  }
+
+  public byte[][] getNoDictSortDims() {
+    return noDictSortDims;
+  }
+
+  public byte[] getNoSortDimsAndMeasures() {
+    return noSortDimsAndMeasures;
+  }
+
+  /**
+   * deserialize from bytes array to get the no sort fields
+   * @param outDictNoSort stores the dict & no-sort fields
+   * @param outNoDictNoSort stores the no-dict & no-sort fields, including complex
+   * @param outMeasures stores the measure fields
+   * @param dataTypes data type for the measure
+   */
+  public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort,
+      Object[] outMeasures, DataType[] dataTypes) {
+    ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
+    // read dict_no_sort
+    int dictNoSortCnt = outDictNoSort.length;
+    for (int i = 0; i < dictNoSortCnt; i++) {
+      outDictNoSort[i] = rowBuffer.getInt();
+    }
+
+    // read no_dict_no_sort (including complex)
+    int noDictNoSortCnt = outNoDictNoSort.length;
+    for (int i = 0; i < noDictNoSortCnt; i++) {
+      short len = rowBuffer.getShort();
+      byte[] bytes = new byte[len];
+      rowBuffer.get(bytes);
+      outNoDictNoSort[i] = bytes;
+    }
+
+    // read measure
+    int measureCnt = outMeasures.length;
+    DataType tmpDataType;
+    Object tmpContent;
+    for (short idx = 0 ; idx < measureCnt; idx++) {
+      if ((byte) 0 == rowBuffer.get()) {
+        outMeasures[idx] = null;
+        continue;
+      }
+
+      tmpDataType = dataTypes[idx];
+      if (DataTypes.BOOLEAN == tmpDataType) {
+        if ((byte) 1 == rowBuffer.get()) {
+          tmpContent = true;
+        } else {
+          tmpContent = false;
+        }
+      } else if (DataTypes.SHORT == tmpDataType) {
+        tmpContent = rowBuffer.getShort();
+      } else if (DataTypes.INT == tmpDataType) {
+        tmpContent = rowBuffer.getInt();
+      } else if (DataTypes.LONG == tmpDataType) {
+        tmpContent = rowBuffer.getLong();
+      } else if (DataTypes.DOUBLE == tmpDataType) {
+        tmpContent = rowBuffer.getDouble();
+      } else if (DataTypes.isDecimal(tmpDataType)) {
+        short len = rowBuffer.getShort();
+        byte[] decimalBytes = new byte[len];
+        rowBuffer.get(decimalBytes);
+        tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
+      } else {
+        throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
+      }
+      outMeasures[idx] = tmpContent;
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
new file mode 100644
index 0000000..f31a2b9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
+
+/**
+ * This class is used to convert/write/read row in sort step in carbondata.
+ * It consists the following function:
+ * 1. convert raw row & intermediate sort temp row to 3-parted row
+ * 2. read/write intermediate sort temp row to sort temp file & unsafe memory
+ * 3. write raw row directly to sort temp file & unsafe memory as intermediate sort temp row
+ */
+public class SortStepRowHandler implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private int dictSortDimCnt = 0;
+  private int dictNoSortDimCnt = 0;
+  private int noDictSortDimCnt = 0;
+  private int noDictNoSortDimCnt = 0;
+  private int measureCnt;
+
+  // indices for dict & sort dimension columns
+  private int[] dictSortDimIdx;
+  // indices for dict & no-sort dimension columns
+  private int[] dictNoSortDimIdx;
+  // indices for no-dict & sort dimension columns
+  private int[] noDictSortDimIdx;
+  // indices for no-dict & no-sort dimension columns, including complex columns
+  private int[] noDictNoSortDimIdx;
+  // indices for measure columns
+  private int[] measureIdx;
+
+  private DataType[] dataTypes;
+
+  /**
+   * constructor
+   * @param tableFieldStat table field stat
+   */
+  public SortStepRowHandler(TableFieldStat tableFieldStat) {
+    this.dictSortDimCnt = tableFieldStat.getDictSortDimCnt();
+    this.dictNoSortDimCnt = tableFieldStat.getDictNoSortDimCnt();
+    this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt();
+    this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt();
+    this.measureCnt = tableFieldStat.getMeasureCnt();
+    this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx();
+    this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx();
+    this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx();
+    this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx();
+    this.measureIdx = tableFieldStat.getMeasureIdx();
+    this.dataTypes = tableFieldStat.getMeasureDataType();
+  }
+
+  /**
+   * constructor
+   * @param sortParameters sort parameters
+   */
+  public SortStepRowHandler(SortParameters sortParameters) {
+    this(new TableFieldStat(sortParameters));
+  }
+
+  /**
+   * Convert carbon row from raw format to 3-parted format.
+   * This method is used in global-sort.
+   *
+   * @param row raw row whose length is the same as field number
+   * @return 3-parted row whose length is 3. (1 for dict dims ,1 for non-dict and complex,
+   * 1 for measures)
+   */
+  public Object[] convertRawRowTo3Parts(Object[] row) {
+    Object[] holder = new Object[3];
+    try {
+      int[] dictDims
+          = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
+      byte[][] nonDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][];
+      Object[] measures = new Object[this.measureCnt];
+
+      // convert dict & data
+      int idxAcc = 0;
+      for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
+        dictDims[idxAcc++] = (int) row[this.dictSortDimIdx[idx]];
+      }
+
+      // convert dict & no-sort
+      for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
+        dictDims[idxAcc++] = (int) row[this.dictNoSortDimIdx[idx]];
+      }
+      // convert no-dict & sort
+      idxAcc = 0;
+      for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
+        nonDictArray[idxAcc++] = (byte[]) row[this.noDictSortDimIdx[idx]];
+      }
+      // convert no-dict & no-sort
+      for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
+        nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]];
+      }
+
+      // convert measure data
+      for (int idx = 0; idx < this.measureCnt; idx++) {
+        measures[idx] = row[this.measureIdx[idx]];
+      }
+
+      NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures);
+    } catch (Exception e) {
+      throw new RuntimeException("Problem while converting row to 3 parts", e);
+    }
+    return holder;
+  }
+
+  /**
+   * Convert intermediate sort temp row to 3-parted row.
+   * This method is used in the final merge sort to feed rows to the next write step.
+   *
+   * @param sortTempRow intermediate sort temp row
+   * @return 3-parted row
+   */
+  public Object[] convertIntermediateSortTempRowTo3Parted(IntermediateSortTempRow sortTempRow) {
+    int[] dictDims
+        = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
+    byte[][] noDictArray
+        = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][];
+
+    int[] dictNoSortDims = new int[this.dictNoSortDimCnt];
+    byte[][] noDictNoSortDims = new byte[this.noDictNoSortDimCnt][];
+    Object[] measures = new Object[this.measureCnt];
+
+    sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortDims, measures, this.dataTypes);
+
+    // dict dims
+    System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims,
+        0, this.dictSortDimCnt);
+    System.arraycopy(dictNoSortDims, 0, dictDims,
+        this.dictSortDimCnt, this.dictNoSortDimCnt);;
+
+    // no dict dims, including complex
+    System.arraycopy(sortTempRow.getNoDictSortDims(), 0,
+        noDictArray, 0, this.noDictSortDimCnt);
+    System.arraycopy(noDictNoSortDims, 0, noDictArray,
+        this.noDictSortDimCnt, this.noDictNoSortDimCnt);
+
+    // measures are already here
+
+    Object[] holder = new Object[3];
+    NonDictionaryUtil.prepareOutObj(holder, dictDims, noDictArray, measures);
+    return holder;
+  }
+
+  /**
+   * Read intermediate sort temp row from InputStream.
+   * This method is used during the merge sort phase to read row from sort temp file.
+   *
+   * @param inputStream input stream
+   * @return a row that contains three parts
+   * @throws IOException if error occrus while reading from stream
+   */
+  public IntermediateSortTempRow readIntermediateSortTempRowFromInputStream(
+      DataInputStream inputStream) throws IOException {
+    int[] dictSortDims = new int[this.dictSortDimCnt];
+    byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
+
+    // read dict & sort dim data
+    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
+      dictSortDims[idx] = inputStream.readInt();
+    }
+
+    // read no-dict & sort data
+    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
+      short len = inputStream.readShort();
+      byte[] bytes = new byte[len];
+      inputStream.readFully(bytes);
+      noDictSortDims[idx] = bytes;
+    }
+
+    // read no-dict dims & measures
+    int len = inputStream.readInt();
+    byte[] noSortDimsAndMeasures = new byte[len];
+    inputStream.readFully(noSortDimsAndMeasures);
+
+    return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
+  }
+
+  /**
+   * Write intermediate sort temp row to OutputStream
+   * This method is used during the merge sort phase to write row to sort temp file.
+   *
+   * @param sortTempRow intermediate sort temp row
+   * @param outputStream output stream
+   * @throws IOException if error occurs while writing to stream
+   */
+  public void writeIntermediateSortTempRowToOutputStream(IntermediateSortTempRow sortTempRow,
+      DataOutputStream outputStream) throws IOException {
+    // write dict & sort dim
+    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
+      outputStream.writeInt(sortTempRow.getDictSortDims()[idx]);
+    }
+
+    // write no-dict & sort dim
+    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
+      byte[] bytes = sortTempRow.getNoDictSortDims()[idx];
+      outputStream.writeShort(bytes.length);
+      outputStream.write(bytes);
+    }
+
+    // write packed no-sort dim & measure
+    outputStream.writeInt(sortTempRow.getNoSortDimsAndMeasures().length);
+    outputStream.write(sortTempRow.getNoSortDimsAndMeasures());
+  }
+
+  /**
+   * Write raw row as an intermediate sort temp row to sort temp file.
+   * This method is used in the beginning of the sort phase. Comparing with converting raw row to
+   * intermediate sort temp row and then writing the converted one, Writing raw row directly will
+   * save the intermediate trivial loss.
+   * This method use an array backend buffer to save memory allocation. The buffer will be reused
+   * for all rows (per thread).
+   *
+   * @param row raw row
+   * @param outputStream output stream
+   * @param rowBuffer array backend buffer
+   * @throws IOException if error occurs while writing to stream
+   */
+  public void writeRawRowAsIntermediateSortTempRowToOutputStream(Object[] row,
+      DataOutputStream outputStream, ByteBuffer rowBuffer) throws IOException {
+    // write dict & sort
+    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
+      outputStream.writeInt((int) row[this.dictSortDimIdx[idx]]);
+    }
+
+    // write no-dict & sort
+    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
+      byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
+      outputStream.writeShort(bytes.length);
+      outputStream.write(bytes);
+    }
+
+    // pack no-sort
+    rowBuffer.clear();
+    packNoSortFieldsToBytes(row, rowBuffer);
+    rowBuffer.flip();
+    int packSize = rowBuffer.limit();
+
+    // write no-sort
+    outputStream.writeInt(packSize);
+    outputStream.write(rowBuffer.array(), 0, packSize);
+  }
+
+  /**
+   * Read intermediate sort temp row from unsafe memory.
+   * This method is used during merge sort phase for off-heap sort.
+   *
+   * @param baseObject base object of memory block
+   * @param address address of the row
+   * @return intermediate sort temp row
+   */
+  public IntermediateSortTempRow readIntermediateSortTempRowFromUnsafeMemory(Object baseObject,
+      long address) {
+    int size = 0;
+
+    int[] dictSortDims = new int[this.dictSortDimCnt];
+    byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
+
+    // read dict & sort dim
+    for (int idx = 0; idx < dictSortDims.length; idx++) {
+      dictSortDims[idx] = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+      size += 4;
+    }
+
+    // read no-dict & sort dim
+    for (int idx = 0; idx < noDictSortDims.length; idx++) {
+      short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+      size += 2;
+      byte[] bytes = new byte[length];
+      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
+          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+      size += length;
+      noDictSortDims[idx] = bytes;
+    }
+
+    // read no-sort dims & measures
+    int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+    size += 4;
+    byte[] noSortDimsAndMeasures = new byte[len];
+    CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
+        noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
+
+    return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
+  }
+
+  /**
+   * Write intermediate sort temp row directly from unsafe memory to stream.
+   * This method is used at the late beginning of the sort phase to write in-memory pages
+   * to sort temp file. Comparing with reading intermediate sort temp row from memory and then
+   * writing it, Writing directly from memory to stream will save the intermediate trivial loss.
+   *
+   * @param baseObject base object of the memory block
+   * @param address base address of the row
+   * @param outputStream output stream
+   * @throws IOException if error occurs while writing to stream
+   */
+  public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObject,
+      long address, DataOutputStream outputStream) throws IOException {
+    int size = 0;
+
+    // dict & sort
+    for (int idx = 0; idx < dictSortDimCnt; idx++) {
+      outputStream.writeInt(CarbonUnsafe.getUnsafe().getInt(baseObject, address + size));
+      size += 4;
+    }
+
+    // no-dict & sort
+    for (int idx = 0; idx < noDictSortDimCnt; idx++) {
+      short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+      size += 2;
+      byte[] bytes = new byte[length];
+      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
+          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+      size += length;
+
+      outputStream.writeShort(length);
+      outputStream.write(bytes);
+    }
+
+    // packed no-sort & measure
+    int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+    size += 4;
+    byte[] noSortDimsAndMeasures = new byte[len];
+    CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
+        noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
+    size += len;
+
+    outputStream.writeInt(len);
+    outputStream.write(noSortDimsAndMeasures);
+  }
+
+  /**
+   * Write raw row as an intermediate sort temp row to memory.
+   * This method is used in the beginning of the off-heap sort phase. Comparing with converting
+   * raw row to intermediate sort temp row and then writing the converted one,
+   * Writing raw row directly will save the intermediate trivial loss.
+   * This method use an array backend buffer to save memory allocation. The buffer will be reused
+   * for all rows (per thread).
+   *
+   * @param row raw row
+   * @param baseObject base object of the memory block
+   * @param address base address for the row
+   * @param rowBuffer array backend buffer
+   * @return number of bytes written to memory
+   */
+  public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row,
+      Object baseObject, long address, ByteBuffer rowBuffer) {
+    int size = 0;
+    // write dict & sort
+    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
+      CarbonUnsafe.getUnsafe()
+          .putInt(baseObject, address + size, (int) row[this.dictSortDimIdx[idx]]);
+      size += 4;
+    }
+
+    // write no-dict & sort
+    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
+      byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
+      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
+      size += 2;
+      CarbonUnsafe.getUnsafe()
+          .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
+              bytes.length);
+      size += bytes.length;
+    }
+
+    // convert pack no-sort
+    rowBuffer.clear();
+    packNoSortFieldsToBytes(row, rowBuffer);
+    rowBuffer.flip();
+    int packSize = rowBuffer.limit();
+
+    // write no-sort
+    CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, packSize);
+    size += 4;
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(rowBuffer.array(), CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
+            packSize);
+    size += packSize;
+    return size;
+  }
+
+  /**
+   * Pack to no-sort fields to byte array
+   *
+   * @param row raw row
+   * @param rowBuffer byte array backend buffer
+   */
+  private void packNoSortFieldsToBytes(Object[] row, ByteBuffer rowBuffer) {
+    // convert dict & no-sort
+    for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
+      rowBuffer.putInt((int) row[this.dictNoSortDimIdx[idx]]);
+    }
+    // convert no-dict & no-sort
+    for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
+      byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
+      rowBuffer.putShort((short) bytes.length);
+      rowBuffer.put(bytes);
+    }
+
+    // convert measure
+    Object tmpValue;
+    DataType tmpDataType;
+    for (int idx = 0; idx < this.measureCnt; idx++) {
+      tmpValue = row[this.measureIdx[idx]];
+      tmpDataType = this.dataTypes[idx];
+      if (null == tmpValue) {
+        rowBuffer.put((byte) 0);
+        continue;
+      }
+      rowBuffer.put((byte) 1);
+      if (DataTypes.BOOLEAN == tmpDataType) {
+        if ((boolean) tmpValue) {
+          rowBuffer.put((byte) 1);
+        } else {
+          rowBuffer.put((byte) 0);
+        }
+      } else if (DataTypes.SHORT == tmpDataType) {
+        rowBuffer.putShort((Short) tmpValue);
+      } else if (DataTypes.INT == tmpDataType) {
+        rowBuffer.putInt((Integer) tmpValue);
+      } else if (DataTypes.LONG == tmpDataType) {
+        rowBuffer.putLong((Long) tmpValue);
+      } else if (DataTypes.DOUBLE == tmpDataType) {
+        rowBuffer.putDouble((Double) tmpValue);
+      } else if (DataTypes.isDecimal(tmpDataType)) {
+        byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
+        rowBuffer.putShort((short) decimalBytes.length);
+        rowBuffer.put(decimalBytes);
+      } else {
+        throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
deleted file mode 100644
index c4e4756..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort;
-
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-
-public class SortStepRowUtil {
-  private int measureCount;
-  private int dimensionCount;
-  private int complexDimensionCount;
-  private int noDictionaryCount;
-  private int[] dictDimIdx;
-  private int[] nonDictIdx;
-  private int[] measureIdx;
-
-  public SortStepRowUtil(SortParameters parameters) {
-    this.measureCount = parameters.getMeasureColCount();
-    this.dimensionCount = parameters.getDimColCount();
-    this.complexDimensionCount = parameters.getComplexDimColCount();
-    this.noDictionaryCount = parameters.getNoDictionaryCount();
-    boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
-
-    int index = 0;
-    int nonDicIndex = 0;
-    int allCount = 0;
-
-    // be careful that the default value is 0
-    this.dictDimIdx = new int[dimensionCount - noDictionaryCount];
-    this.nonDictIdx = new int[noDictionaryCount + complexDimensionCount];
-    this.measureIdx = new int[measureCount];
-
-    // indices for dict dim columns
-    for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
-      if (isNoDictionaryDimensionColumn[i]) {
-        nonDictIdx[nonDicIndex++] = i;
-      } else {
-        dictDimIdx[index++] = allCount;
-      }
-      allCount++;
-    }
-
-    // indices for non dict dim/complex columns
-    for (int i = 0; i < complexDimensionCount; i++) {
-      nonDictIdx[nonDicIndex++] = allCount;
-      allCount++;
-    }
-
-    // indices for measure columns
-    for (int i = 0; i < measureCount; i++) {
-      measureIdx[i] = allCount;
-      allCount++;
-    }
-  }
-
-  public Object[] convertRow(Object[] data) {
-    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
-    Object[] holder = new Object[3];
-    try {
-
-      int[] dictDims = new int[dimensionCount - noDictionaryCount];
-      byte[][] nonDictArray = new byte[noDictionaryCount + complexDimensionCount][];
-      Object[] measures = new Object[measureCount];
-
-      // write dict dim data
-      for (int idx = 0; idx < dictDimIdx.length; idx++) {
-        dictDims[idx] = (int) data[dictDimIdx[idx]];
-      }
-
-      // write non dict dim data
-      for (int idx = 0; idx < nonDictIdx.length; idx++) {
-        nonDictArray[idx] = (byte[]) data[nonDictIdx[idx]];
-      }
-
-      // write measure data
-      for (int idx = 0; idx < measureIdx.length; idx++) {
-        measures[idx] = data[measureIdx[idx]];
-      }
-      NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures);
-
-      // increment number if record read
-    } catch (Exception e) {
-      throw new RuntimeException("Problem while converting row ", e);
-    }
-    //return out row
-    return holder;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index e5583c2..7ea5cb3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -19,35 +19,20 @@ package org.apache.carbondata.processing.loading.sort.unsafe;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.util.Arrays;
+import java.nio.ByteBuffer;
 
-import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 /**
  * It can keep the data of prescribed size data in offheap/onheap memory and returns it when needed
  */
 public class UnsafeCarbonRowPage {
-
-  private boolean[] noDictionaryDimensionMapping;
-
-  private boolean[] noDictionarySortColumnMapping;
-
-  private int dimensionSize;
-
-  private int measureSize;
-
-  private DataType[] measureDataType;
-
-  private long[] nullSetWords;
-
   private IntPointerBuffer buffer;
 
   private int lastSize;
@@ -62,16 +47,14 @@ public class UnsafeCarbonRowPage {
 
   private long taskId;
 
-  public UnsafeCarbonRowPage(boolean[] noDictionaryDimensionMapping,
-      boolean[] noDictionarySortColumnMapping, int dimensionSize, int measureSize, DataType[] type,
-      MemoryBlock memoryBlock, boolean saveToDisk, long taskId) {
-    this.noDictionaryDimensionMapping = noDictionaryDimensionMapping;
-    this.noDictionarySortColumnMapping = noDictionarySortColumnMapping;
-    this.dimensionSize = dimensionSize;
-    this.measureSize = measureSize;
-    this.measureDataType = type;
+  private TableFieldStat tableFieldStat;
+  private SortStepRowHandler sortStepRowHandler;
+
+  public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock,
+      boolean saveToDisk, long taskId) {
+    this.tableFieldStat = tableFieldStat;
+    this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
     this.saveToDisk = saveToDisk;
-    this.nullSetWords = new long[((measureSize - 1) >> 6) + 1];
     this.taskId = taskId;
     buffer = new IntPointerBuffer(this.taskId);
     this.dataBlock = memoryBlock;
@@ -80,255 +63,44 @@ public class UnsafeCarbonRowPage {
     this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
   }
 
-  public int addRow(Object[] row) {
-    int size = addRow(row, dataBlock.getBaseOffset() + lastSize);
+  public int addRow(Object[] row, ByteBuffer rowBuffer) {
+    int size = addRow(row, dataBlock.getBaseOffset() + lastSize, rowBuffer);
     buffer.set(lastSize);
     lastSize = lastSize + size;
     return size;
   }
 
-  private int addRow(Object[] row, long address) {
-    if (row == null) {
-      throw new RuntimeException("Row is null ??");
-    }
-    int dimCount = 0;
-    int size = 0;
-    Object baseObject = dataBlock.getBaseObject();
-    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
-      if (noDictionaryDimensionMapping[dimCount]) {
-        byte[] col = (byte[]) row[dimCount];
-        CarbonUnsafe.getUnsafe()
-            .putShort(baseObject, address + size, (short) col.length);
-        size += 2;
-        CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
-            address + size, col.length);
-        size += col.length;
-      } else {
-        int value = (int) row[dimCount];
-        CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, value);
-        size += 4;
-      }
-    }
-
-    // write complex dimensions here.
-    for (; dimCount < dimensionSize; dimCount++) {
-      byte[] col = (byte[]) row[dimCount];
-      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) col.length);
-      size += 2;
-      CarbonUnsafe.getUnsafe().copyMemory(col, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
-          address + size, col.length);
-      size += col.length;
-    }
-    Arrays.fill(nullSetWords, 0);
-    int nullSetSize = nullSetWords.length * 8;
-    int nullWordLoc = size;
-    size += nullSetSize;
-    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
-      Object value = row[mesCount + dimensionSize];
-      if (null != value) {
-        DataType dataType = measureDataType[mesCount];
-        if (dataType == DataTypes.BOOLEAN) {
-          Boolean bval = (Boolean) value;
-          CarbonUnsafe.getUnsafe().putBoolean(baseObject, address + size, bval);
-          size += 1;
-        } else if (dataType == DataTypes.SHORT) {
-          Short sval = (Short) value;
-          CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, sval);
-          size += 2;
-        } else if (dataType == DataTypes.INT) {
-          Integer ival = (Integer) value;
-          CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, ival);
-          size += 4;
-        } else if (dataType == DataTypes.LONG) {
-          Long val = (Long) value;
-          CarbonUnsafe.getUnsafe().putLong(baseObject, address + size, val);
-          size += 8;
-        } else if (dataType == DataTypes.DOUBLE) {
-          Double doubleVal = (Double) value;
-          CarbonUnsafe.getUnsafe().putDouble(baseObject, address + size, doubleVal);
-          size += 8;
-        } else if (DataTypes.isDecimal(dataType)) {
-          BigDecimal decimalVal = (BigDecimal) value;
-          byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(decimalVal);
-          CarbonUnsafe.getUnsafe()
-              .putShort(baseObject, address + size, (short) bigDecimalInBytes.length);
-          size += 2;
-          CarbonUnsafe.getUnsafe()
-              .copyMemory(bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject,
-                  address + size, bigDecimalInBytes.length);
-          size += bigDecimalInBytes.length;
-        } else {
-          throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
-        }
-        set(nullSetWords, mesCount);
-      } else {
-        unset(nullSetWords, mesCount);
-      }
-    }
-    CarbonUnsafe.getUnsafe().copyMemory(nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET, baseObject,
-        address + nullWordLoc, nullSetSize);
-    return size;
+  /**
+   * add raw row as intermidiate sort temp row to page
+   *
+   * @param row
+   * @param address
+   * @return
+   */
+  private int addRow(Object[] row, long address, ByteBuffer rowBuffer) {
+    return sortStepRowHandler.writeRawRowAsIntermediateSortTempRowToUnsafeMemory(row,
+        dataBlock.getBaseObject(), address, rowBuffer);
   }
 
-  public Object[] getRow(long address, Object[] rowToFill) {
-    int dimCount = 0;
-    int size = 0;
-
-    Object baseObject = dataBlock.getBaseObject();
-    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
-      if (noDictionaryDimensionMapping[dimCount]) {
-        short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-        byte[] col = new byte[aShort];
-        size += 2;
-        CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-                col.length);
-        size += col.length;
-        rowToFill[dimCount] = col;
-      } else {
-        int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-        size += 4;
-        rowToFill[dimCount] = anInt;
-      }
-    }
-
-    // write complex dimensions here.
-    for (; dimCount < dimensionSize; dimCount++) {
-      short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-      byte[] col = new byte[aShort];
-      size += 2;
-      CarbonUnsafe.getUnsafe()
-          .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
-      size += col.length;
-      rowToFill[dimCount] = col;
-    }
-
-    int nullSetSize = nullSetWords.length * 8;
-    Arrays.fill(nullSetWords, 0);
-    CarbonUnsafe.getUnsafe()
-        .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
-            nullSetSize);
-    size += nullSetSize;
-
-    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
-      if (isSet(nullSetWords, mesCount)) {
-        DataType dataType = measureDataType[mesCount];
-        if (dataType == DataTypes.BOOLEAN) {
-          Boolean bval = CarbonUnsafe.getUnsafe().getBoolean(baseObject, address + size);
-          size += 1;
-          rowToFill[dimensionSize + mesCount] = bval;
-        } else if (dataType == DataTypes.SHORT) {
-          Short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-          size += 2;
-          rowToFill[dimensionSize + mesCount] = sval;
-        } else if (dataType == DataTypes.INT) {
-          Integer ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-          size += 4;
-          rowToFill[dimensionSize + mesCount] = ival;
-        } else if (dataType == DataTypes.LONG) {
-          Long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
-          size += 8;
-          rowToFill[dimensionSize + mesCount] = val;
-        } else if (dataType == DataTypes.DOUBLE) {
-          Double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
-          size += 8;
-          rowToFill[dimensionSize + mesCount] = doubleVal;
-        } else if (DataTypes.isDecimal(dataType)) {
-          short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-          byte[] bigDecimalInBytes = new byte[aShort];
-          size += 2;
-          CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
-              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
-          size += bigDecimalInBytes.length;
-          rowToFill[dimensionSize + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes);
-        } else {
-          throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
-        }
-      } else {
-        rowToFill[dimensionSize + mesCount] = null;
-      }
-    }
-    return rowToFill;
+  /**
+   * get one row from memory address
+   * @param address address
+   * @return one row
+   */
+  public IntermediateSortTempRow getRow(long address) {
+    return sortStepRowHandler.readIntermediateSortTempRowFromUnsafeMemory(
+        dataBlock.getBaseObject(), address);
   }
 
-  public void fillRow(long address, DataOutputStream stream) throws IOException {
-    int dimCount = 0;
-    int size = 0;
-
-    Object baseObject = dataBlock.getBaseObject();
-    for (; dimCount < noDictionaryDimensionMapping.length; dimCount++) {
-      if (noDictionaryDimensionMapping[dimCount]) {
-        short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-        byte[] col = new byte[aShort];
-        size += 2;
-        CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-                col.length);
-        size += col.length;
-        stream.writeShort(aShort);
-        stream.write(col);
-      } else {
-        int anInt = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-        size += 4;
-        stream.writeInt(anInt);
-      }
-    }
-
-    // write complex dimensions here.
-    for (; dimCount < dimensionSize; dimCount++) {
-      short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-      byte[] col = new byte[aShort];
-      size += 2;
-      CarbonUnsafe.getUnsafe()
-          .copyMemory(baseObject, address + size, col, CarbonUnsafe.BYTE_ARRAY_OFFSET, col.length);
-      size += col.length;
-      stream.writeShort(aShort);
-      stream.write(col);
-    }
-
-    int nullSetSize = nullSetWords.length * 8;
-    Arrays.fill(nullSetWords, 0);
-    CarbonUnsafe.getUnsafe()
-        .copyMemory(baseObject, address + size, nullSetWords, CarbonUnsafe.LONG_ARRAY_OFFSET,
-            nullSetSize);
-    size += nullSetSize;
-    for (int i = 0; i < nullSetWords.length; i++) {
-      stream.writeLong(nullSetWords[i]);
-    }
-
-    for (int mesCount = 0; mesCount < measureSize; mesCount++) {
-      if (isSet(nullSetWords, mesCount)) {
-        DataType dataType = measureDataType[mesCount];
-        if (dataType == DataTypes.SHORT) {
-          short sval = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-          size += 2;
-          stream.writeShort(sval);
-        } else if (dataType == DataTypes.INT) {
-          int ival = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-          size += 4;
-          stream.writeInt(ival);
-        } else if (dataType == DataTypes.LONG) {
-          long val = CarbonUnsafe.getUnsafe().getLong(baseObject, address + size);
-          size += 8;
-          stream.writeLong(val);
-        } else if (dataType == DataTypes.DOUBLE) {
-          double doubleVal = CarbonUnsafe.getUnsafe().getDouble(baseObject, address + size);
-          size += 8;
-          stream.writeDouble(doubleVal);
-        } else if (DataTypes.isDecimal(dataType)) {
-          short aShort = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-          byte[] bigDecimalInBytes = new byte[aShort];
-          size += 2;
-          CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size, bigDecimalInBytes,
-              CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length);
-          size += bigDecimalInBytes.length;
-          stream.writeShort(aShort);
-          stream.write(bigDecimalInBytes);
-        } else {
-          throw new IllegalArgumentException("unsupported data type:" + measureDataType[mesCount]);
-        }
-      }
-    }
+  /**
+   * write a row to stream
+   * @param address address of a row
+   * @param stream stream
+   * @throws IOException
+   */
+  public void writeRow(long address, DataOutputStream stream) throws IOException {
+    sortStepRowHandler.writeIntermediateSortTempRowFromUnsafeMemoryToStream(
+        dataBlock.getBaseObject(), address, stream);
   }
 
   public void freeMemory() {
@@ -362,27 +134,8 @@ public class UnsafeCarbonRowPage {
     return dataBlock;
   }
 
-  public static void set(long[] words, int index) {
-    int wordOffset = (index >> 6);
-    words[wordOffset] |= (1L << index);
-  }
-
-  public static void unset(long[] words, int index) {
-    int wordOffset = (index >> 6);
-    words[wordOffset] &= ~(1L << index);
-  }
-
-  public static boolean isSet(long[] words, int index) {
-    int wordOffset = (index >> 6);
-    return ((words[wordOffset] & (1L << index)) != 0);
-  }
-
-  public boolean[] getNoDictionaryDimensionMapping() {
-    return noDictionaryDimensionMapping;
-  }
-
-  public boolean[] getNoDictionarySortColumnMapping() {
-    return noDictionarySortColumnMapping;
+  public TableFieldStat getTableFieldStat() {
+    return tableFieldStat;
   }
 
   public void setNewDataBlock(MemoryBlock newMemoryBlock) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
index 4dd5e44..5d038d3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeSortDataRows.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.processing.loading.sort.unsafe;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -41,13 +42,14 @@ import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator;
-import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDIms;
+import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDims;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
 import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger;
 import org.apache.carbondata.processing.loading.sort.unsafe.sort.TimSort;
 import org.apache.carbondata.processing.loading.sort.unsafe.sort.UnsafeIntSortDataFormat;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 public class UnsafeSortDataRows {
@@ -69,7 +71,8 @@ public class UnsafeSortDataRows {
    */
 
   private SortParameters parameters;
-
+  private TableFieldStat tableFieldStat;
+  private ThreadLocal<ByteBuffer> rowBuffer;
   private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger;
 
   private UnsafeCarbonRowPage rowPage;
@@ -94,7 +97,13 @@ public class UnsafeSortDataRows {
   public UnsafeSortDataRows(SortParameters parameters,
       UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) {
     this.parameters = parameters;
-
+    this.tableFieldStat = new TableFieldStat(parameters);
+    this.rowBuffer = new ThreadLocal<ByteBuffer>() {
+      @Override protected ByteBuffer initialValue() {
+        byte[] backedArray = new byte[2 * 1024 * 1024];
+        return ByteBuffer.wrap(backedArray);
+      }
+    };
     this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger;
 
     // observer of writing file in thread
@@ -127,11 +136,7 @@ public class UnsafeSortDataRows {
     if (isMemoryAvailable) {
       UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(baseBlock.size());
     }
-    this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
-        parameters.getNoDictionarySortColumn(),
-        parameters.getDimColCount() + parameters.getComplexDimColCount(),
-        parameters.getMeasureColCount(), parameters.getMeasureDataType(), baseBlock,
-        !isMemoryAvailable, taskId);
+    this.rowPage = new UnsafeCarbonRowPage(tableFieldStat, baseBlock, !isMemoryAvailable, taskId);
     // Delete if any older file exists in sort temp folder
     deleteSortLocationIfExists();
 
@@ -178,7 +183,7 @@ public class UnsafeSortDataRows {
   private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException {
     for (int i = 0; i < size; i++) {
       if (rowPage.canAdd()) {
-        bytesAdded += rowPage.addRow(rowBatch[i]);
+        bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
       } else {
         try {
           if (enableInMemoryIntermediateMerge) {
@@ -194,15 +199,8 @@ public class UnsafeSortDataRows {
           if (!saveToDisk) {
             UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
           }
-          rowPage = new UnsafeCarbonRowPage(
-                  parameters.getNoDictionaryDimnesionColumn(),
-                  parameters.getNoDictionarySortColumn(),
-                  parameters.getDimColCount() + parameters.getComplexDimColCount(),
-                  parameters.getMeasureColCount(),
-                  parameters.getMeasureDataType(),
-                  memoryBlock,
-                  saveToDisk, taskId);
-          bytesAdded += rowPage.addRow(rowBatch[i]);
+          rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
+          bytesAdded += rowPage.addRow(rowBatch[i], rowBuffer.get());
         } catch (Exception e) {
           LOGGER.error(
                   "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
@@ -220,7 +218,7 @@ public class UnsafeSortDataRows {
     // if record holder list size is equal to sort buffer size then it will
     // sort the list and then write current list data to file
     if (rowPage.canAdd()) {
-      rowPage.addRow(row);
+      rowPage.addRow(row, rowBuffer.get());
     } else {
       try {
         if (enableInMemoryIntermediateMerge) {
@@ -235,13 +233,8 @@ public class UnsafeSortDataRows {
         if (!saveToDisk) {
           UnsafeSortMemoryManager.INSTANCE.allocateDummyMemory(memoryBlock.size());
         }
-        rowPage = new UnsafeCarbonRowPage(
-            parameters.getNoDictionaryDimnesionColumn(),
-            parameters.getNoDictionarySortColumn(),
-            parameters.getDimColCount(), parameters.getMeasureColCount(),
-            parameters.getMeasureDataType(), memoryBlock,
-            saveToDisk, taskId);
-        rowPage.addRow(row);
+        rowPage = new UnsafeCarbonRowPage(tableFieldStat, memoryBlock, saveToDisk, taskId);
+        rowPage.addRow(row, rowBuffer.get());
       } catch (Exception e) {
         LOGGER.error(
             "exception occurred while trying to acquire a semaphore lock: " + e.getMessage());
@@ -269,7 +262,7 @@ public class UnsafeSortDataRows {
             new UnsafeRowComparator(rowPage));
       } else {
         timSort.sort(rowPage.getBuffer(), 0, rowPage.getBuffer().getActualSize(),
-            new UnsafeRowComparatorForNormalDIms(rowPage));
+            new UnsafeRowComparatorForNormalDims(rowPage));
       }
       unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(rowPage);
     } else {
@@ -295,10 +288,9 @@ public class UnsafeSortDataRows {
       // write number of entries to the file
       stream.writeInt(actualSize);
       for (int i = 0; i < actualSize; i++) {
-        rowPage.fillRow(rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(),
-            stream);
+        rowPage.writeRow(
+            rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(), stream);
       }
-
     } catch (IOException e) {
       throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
     } finally {
@@ -367,7 +359,7 @@ public class UnsafeSortDataRows {
               new UnsafeRowComparator(page));
         } else {
           timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(),
-              new UnsafeRowComparatorForNormalDIms(page));
+              new UnsafeRowComparatorForNormalDims(page));
         }
         if (page.isSaveToDisk()) {
           // create a new file every time
@@ -380,7 +372,8 @@ public class UnsafeSortDataRows {
           writeDataToFile(page, sortTempFile);
           LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize()
               + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:"
-              + sortTempFile);
+              + sortTempFile + ", sort temp file size in MB is "
+              + sortTempFile.length() * 0.1 * 10 / 1024 / 1024);
           page.freeMemory();
           // add sort temp filename to and arrayList. When the list size reaches 20 then
           // intermediate merging of sort temp files will be triggered

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
index d02be9b..33342dc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparator.java
@@ -23,63 +23,25 @@ import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
-
-  /**
-   * mapping of dictionary and no dictionary of sort_columns.
-   */
-  private boolean[] noDictionarySortColumnMaping;
-
   private Object baseObject;
+  private TableFieldStat tableFieldStat;
+  private int dictSizeInMemory;
 
   public UnsafeRowComparator(UnsafeCarbonRowPage rowPage) {
-    this.noDictionarySortColumnMaping = rowPage.getNoDictionarySortColumnMapping();
     this.baseObject = rowPage.getDataBlock().getBaseObject();
+    this.tableFieldStat = rowPage.getTableFieldStat();
+    this.dictSizeInMemory = (tableFieldStat.getDictSortDimCnt()
+        + tableFieldStat.getDictNoSortDimCnt()) * 4;
   }
 
   /**
    * Below method will be used to compare two mdkey
    */
   public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
-    int diff = 0;
-    long rowA = rowL.address;
-    long rowB = rowR.address;
-    int sizeA = 0;
-    int sizeB = 0;
-    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
-      if (isNoDictionary) {
-        short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowA + sizeA);
-        byte[] byteArr1 = new byte[aShort1];
-        sizeA += 2;
-        CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowA + sizeA, byteArr1,
-            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort1);
-        sizeA += aShort1;
-
-        short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObject, rowB + sizeB);
-        byte[] byteArr2 = new byte[aShort2];
-        sizeB += 2;
-        CarbonUnsafe.getUnsafe().copyMemory(baseObject, rowB + sizeB, byteArr2,
-            CarbonUnsafe.BYTE_ARRAY_OFFSET, aShort2);
-        sizeB += aShort2;
-
-        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-        if (difference != 0) {
-          return difference;
-        }
-      } else {
-        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
-        sizeA += 4;
-        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
-        sizeB += 4;
-        diff = dimFieldA - dimFieldB;
-        if (diff != 0) {
-          return diff;
-        }
-      }
-    }
-
-    return diff;
+    return compare(rowL, baseObject, rowR, baseObject);
   }
 
   /**
@@ -90,35 +52,40 @@ public class UnsafeRowComparator implements Comparator<UnsafeCarbonRow> {
     int diff = 0;
     long rowA = rowL.address;
     long rowB = rowR.address;
-    int sizeA = 0;
-    int sizeB = 0;
-    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+    int sizeInDictPartA = 0;
+
+    int sizeInNonDictPartA = 0;
+    int sizeInDictPartB = 0;
+    int sizeInNonDictPartB = 0;
+    for (boolean isNoDictionary : tableFieldStat.getIsSortColNoDictFlags()) {
       if (isNoDictionary) {
-        short aShort1 = CarbonUnsafe.getUnsafe().getShort(baseObjectL, rowA + sizeA);
-        byte[] byteArr1 = new byte[aShort1];
-        sizeA += 2;
+        short lengthA = CarbonUnsafe.getUnsafe().getShort(baseObjectL,
+            rowA + dictSizeInMemory + sizeInNonDictPartA);
+        byte[] byteArr1 = new byte[lengthA];
+        sizeInNonDictPartA += 2;
         CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectL, rowA + sizeA, byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-                aShort1);
-        sizeA += aShort1;
+            .copyMemory(baseObjectL, rowA + dictSizeInMemory + sizeInNonDictPartA,
+                byteArr1, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthA);
+        sizeInNonDictPartA += lengthA;
 
-        short aShort2 = CarbonUnsafe.getUnsafe().getShort(baseObjectR, rowB + sizeB);
-        byte[] byteArr2 = new byte[aShort2];
-        sizeB += 2;
+        short lengthB = CarbonUnsafe.getUnsafe().getShort(baseObjectR,
+            rowB + dictSizeInMemory + sizeInNonDictPartB);
+        byte[] byteArr2 = new byte[lengthB];
+        sizeInNonDictPartB += 2;
         CarbonUnsafe.getUnsafe()
-            .copyMemory(baseObjectR, rowB + sizeB, byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET,
-                aShort2);
-        sizeB += aShort2;
+            .copyMemory(baseObjectR, rowB + dictSizeInMemory + sizeInNonDictPartB,
+                byteArr2, CarbonUnsafe.BYTE_ARRAY_OFFSET, lengthB);
+        sizeInNonDictPartB += lengthB;
 
         int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
         if (difference != 0) {
           return difference;
         }
       } else {
-        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeA);
-        sizeA += 4;
-        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeB);
-        sizeB += 4;
+        int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObjectL, rowA + sizeInDictPartA);
+        sizeInDictPartA += 4;
+        int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObjectR, rowB + sizeInDictPartB);
+        sizeInDictPartB += 4;
         diff = dimFieldA - dimFieldB;
         if (diff != 0) {
           return diff;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
deleted file mode 100644
index 483dcb2..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDIms.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
-
-import java.util.Comparator;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
-
-public class UnsafeRowComparatorForNormalDIms implements Comparator<UnsafeCarbonRow> {
-
-  private Object baseObject;
-
-  private int numberOfSortColumns;
-
-  public UnsafeRowComparatorForNormalDIms(UnsafeCarbonRowPage rowPage) {
-    this.baseObject = rowPage.getDataBlock().getBaseObject();
-    this.numberOfSortColumns = rowPage.getNoDictionarySortColumnMapping().length;
-  }
-
-  /**
-   * Below method will be used to compare two mdkey
-   */
-  public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
-    int diff = 0;
-    long rowA = rowL.address;
-    long rowB = rowR.address;
-    int sizeA = 0;
-    int sizeB = 0;
-    for (int i = 0; i < numberOfSortColumns; i++) {
-      int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
-      sizeA += 4;
-      int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
-      sizeB += 4;
-      diff = dimFieldA - dimFieldB;
-      if (diff != 0) {
-        return diff;
-      }
-    }
-
-    return diff;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
new file mode 100644
index 0000000..e9cfb1c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/comparator/UnsafeRowComparatorForNormalDims.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort.unsafe.comparator;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
+import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow;
+
+public class UnsafeRowComparatorForNormalDims implements Comparator<UnsafeCarbonRow> {
+
+  private Object baseObject;
+
+  private int numberOfSortColumns;
+
+  public UnsafeRowComparatorForNormalDims(UnsafeCarbonRowPage rowPage) {
+    this.baseObject = rowPage.getDataBlock().getBaseObject();
+    this.numberOfSortColumns = rowPage.getTableFieldStat().getIsSortColNoDictFlags().length;
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(UnsafeCarbonRow rowL, UnsafeCarbonRow rowR) {
+    int diff = 0;
+    long rowA = rowL.address;
+    long rowB = rowR.address;
+    int sizeA = 0;
+    int sizeB = 0;
+    for (int i = 0; i < numberOfSortColumns; i++) {
+      int dimFieldA = CarbonUnsafe.getUnsafe().getInt(baseObject, rowA + sizeA);
+      sizeA += 4;
+      int dimFieldB = CarbonUnsafe.getUnsafe().getInt(baseObject, rowB + sizeB);
+      sizeB += 4;
+      diff = dimFieldA - dimFieldB;
+      if (diff != 0) {
+        return diff;
+      }
+    }
+
+    return diff;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
index 686e855..d790c41 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/SortTempChunkHolder.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
 /**
@@ -28,7 +29,7 @@ public interface SortTempChunkHolder extends Comparable<SortTempChunkHolder> {
 
   void readRow()  throws CarbonSortKeyAndGroupByException;
 
-  Object[] getRow();
+  IntermediateSortTempRow getRow();
 
   int numberOfRows();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 6b0cfa6..a776db1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -19,9 +19,10 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeInMemoryIntermediateDataMerger;
-import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
 
 public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
 
@@ -38,21 +39,18 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
 
   private UnsafeCarbonRowPage[] rowPages;
 
-  private NewRowComparator comparator;
+  private IntermediateSortTempRowComparator comparator;
 
-  private Object[] currentRow;
-
-  private int columnSize;
+  private IntermediateSortTempRow currentRow;
 
   public UnsafeFinalMergePageHolder(UnsafeInMemoryIntermediateDataMerger merger,
-      boolean[] noDictSortColumnMapping, int columnSize) {
+      boolean[] noDictSortColumnMapping) {
     this.actualSize = merger.getEntryCount();
     this.mergedAddresses = merger.getMergedAddresses();
     this.rowPageIndexes = merger.getRowPageIndexes();
     this.rowPages = merger.getUnsafeCarbonRowPages();
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new NewRowComparator(noDictSortColumnMapping);
-    this.columnSize = columnSize;
+    this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping);
   }
 
   public boolean hasNext() {
@@ -63,12 +61,11 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
   }
 
   public void readRow() {
-    currentRow = new Object[columnSize];
-    rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter], currentRow);
+    currentRow = rowPages[rowPageIndexes[counter]].getRow(mergedAddresses[counter]);
     counter++;
   }
 
-  public Object[] getRow() {
+  public IntermediateSortTempRow getRow() {
     return currentRow;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/de92ea9a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index 6f05088..cbcbbae 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -19,8 +19,9 @@ package org.apache.carbondata.processing.loading.sort.unsafe.holder;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.unsafe.UnsafeCarbonRowPage;
-import org.apache.carbondata.processing.sort.sortdata.NewRowComparator;
+import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
 
 public class UnsafeInmemoryHolder implements SortTempChunkHolder {
 
@@ -33,21 +34,18 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
 
   private UnsafeCarbonRowPage rowPage;
 
-  private Object[] currentRow;
+  private IntermediateSortTempRow currentRow;
 
   private long address;
 
-  private NewRowComparator comparator;
+  private IntermediateSortTempRowComparator comparator;
 
-  private int columnSize;
-
-  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage, int columnSize,
-      int numberOfSortColumns) {
+  public UnsafeInmemoryHolder(UnsafeCarbonRowPage rowPage) {
     this.actualSize = rowPage.getBuffer().getActualSize();
     this.rowPage = rowPage;
     LOGGER.audit("Processing unsafe inmemory rows page with size : " + actualSize);
-    this.comparator = new NewRowComparator(rowPage.getNoDictionarySortColumnMapping());
-    this.columnSize = columnSize;
+    this.comparator = new IntermediateSortTempRowComparator(
+        rowPage.getTableFieldStat().getIsSortColNoDictFlags());
   }
 
   public boolean hasNext() {
@@ -58,13 +56,12 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
   }
 
   public void readRow() {
-    currentRow = new Object[columnSize];
     address = rowPage.getBuffer().get(counter);
-    rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset(), currentRow);
+    currentRow = rowPage.getRow(address + rowPage.getDataBlock().getBaseOffset());
     counter++;
   }
 
-  public Object[] getRow() {
+  public IntermediateSortTempRow getRow() {
     return currentRow;
   }