You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/08/07 13:10:10 UTC

[46/50] [abbrv] carbondata git commit: [CARBONDATA-2836]Fixed data loading performance issue

[CARBONDATA-2836]Fixed data loading performance issue

Problem: Data Loading is taking more time when number of records are high(3.5 billion) records

Root Cause: In case of Final merge sort temp row conversion is done in main thread because of this final step processing became slower.

Solution: Move conversion logic to pre-fetch thread for parallel processing. This only done for single merge, intermediate merge no need to convert no sort columns.

This closes #2611


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f27efb3e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f27efb3e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f27efb3e

Branch: refs/heads/external-format
Commit: f27efb3e3757619ebdd822da8b6ab9737602de7b
Parents: b9e5106
Author: kumarvishal09 <ku...@gmail.com>
Authored: Mon Aug 6 19:00:27 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Aug 7 17:02:07 2018 +0530

----------------------------------------------------------------------
 .../loading/row/IntermediateSortTempRow.java    | 109 ++---------
 .../loading/sort/SortStepRowHandler.java        | 194 ++++++++++++++++---
 .../sort/unsafe/UnsafeCarbonRowPage.java        |  14 +-
 .../holder/UnsafeFinalMergePageHolder.java      |   3 +
 .../unsafe/holder/UnsafeInmemoryHolder.java     |   1 +
 .../holder/UnsafeSortTempFileChunkHolder.java   |  19 +-
 .../merger/UnsafeIntermediateFileMerger.java    |   3 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |   5 +-
 .../sort/sortdata/IntermediateFileMerger.java   |   3 +-
 .../SingleThreadFinalSortFilesMerger.java       |   2 +-
 .../sort/sortdata/SortTempFileChunkHolder.java  |  19 +-
 11 files changed, 234 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
index 47b419e..1ad7879 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -16,12 +16,6 @@
  */
 package org.apache.carbondata.processing.loading.row;
 
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
 /**
  * During sort procedure, each row will be written to sort temp file in this logic format.
  * an intermediate sort temp row consists 3 parts:
@@ -30,7 +24,16 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 public class IntermediateSortTempRow {
   private int[] dictSortDims;
   private byte[][] noDictSortDims;
+  /**
+   * this will be used for intermediate merger when
+   * no sort field and measure field will not be
+   * used for sorting
+   */
   private byte[] noSortDimsAndMeasures;
+  /**
+   * for final merger keep the measures
+   */
+  private Object[] measures;
 
   public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
       byte[] noSortDimsAndMeasures) {
@@ -39,10 +42,21 @@ public class IntermediateSortTempRow {
     this.noSortDimsAndMeasures = noSortDimsAndMeasures;
   }
 
+  public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
+      Object[] measures) {
+    this.dictSortDims = dictSortDims;
+    this.noDictSortDims = noDictSortDims;
+    this.measures = measures;
+  }
+
   public int[] getDictSortDims() {
     return dictSortDims;
   }
 
+  public Object[] getMeasures() {
+    return measures;
+  }
+
   public byte[][] getNoDictSortDims() {
     return noDictSortDims;
   }
@@ -50,87 +64,4 @@ public class IntermediateSortTempRow {
   public byte[] getNoSortDimsAndMeasures() {
     return noSortDimsAndMeasures;
   }
-
-  /**
-   * deserialize from bytes array to get the no sort fields
-   * @param outDictNoSort stores the dict & no-sort fields
-   * @param outNoDictNoSort stores all no-dict & no-sort fields,
-   *                        including complex and varchar fields
-   * @param outMeasures stores the measure fields
-   * @param dataTypes data type for the measure
-   * @param varcharDimCnt number of varchar column
-   * @param complexDimCnt number of complex column
-   */
-  public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort,
-      Object[] outMeasures, DataType[] dataTypes, int varcharDimCnt, int complexDimCnt) {
-    ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
-    // read dict_no_sort
-    int dictNoSortCnt = outDictNoSort.length;
-    for (int i = 0; i < dictNoSortCnt; i++) {
-      outDictNoSort[i] = rowBuffer.getInt();
-    }
-
-    // read no_dict_no_sort
-    int noDictNoSortCnt = outNoDictNoSort.length - varcharDimCnt - complexDimCnt;
-    for (int i = 0; i < noDictNoSortCnt; i++) {
-      short len = rowBuffer.getShort();
-      byte[] bytes = new byte[len];
-      rowBuffer.get(bytes);
-      outNoDictNoSort[i] = bytes;
-    }
-
-    // read varchar dims
-    for (int i = 0; i < varcharDimCnt; i++) {
-      int len = rowBuffer.getInt();
-      byte[] bytes = new byte[len];
-      rowBuffer.get(bytes);
-      outNoDictNoSort[i + noDictNoSortCnt] = bytes;
-    }
-
-    // read complex dims
-    for (int i = 0; i < complexDimCnt; i++) {
-      short len = rowBuffer.getShort();
-      byte[] bytes = new byte[len];
-      rowBuffer.get(bytes);
-      outNoDictNoSort[i + noDictNoSortCnt + varcharDimCnt] = bytes;
-    }
-
-    // read measure
-    int measureCnt = outMeasures.length;
-    DataType tmpDataType;
-    Object tmpContent;
-    for (short idx = 0 ; idx < measureCnt; idx++) {
-      if ((byte) 0 == rowBuffer.get()) {
-        outMeasures[idx] = null;
-        continue;
-      }
-
-      tmpDataType = dataTypes[idx];
-      if (DataTypes.BOOLEAN == tmpDataType) {
-        if ((byte) 1 == rowBuffer.get()) {
-          tmpContent = true;
-        } else {
-          tmpContent = false;
-        }
-      } else if (DataTypes.SHORT == tmpDataType) {
-        tmpContent = rowBuffer.getShort();
-      } else if (DataTypes.INT == tmpDataType) {
-        tmpContent = rowBuffer.getInt();
-      } else if (DataTypes.LONG == tmpDataType) {
-        tmpContent = rowBuffer.getLong();
-      } else if (DataTypes.DOUBLE == tmpDataType) {
-        tmpContent = rowBuffer.getDouble();
-      } else if (DataTypes.isDecimal(tmpDataType)) {
-        short len = rowBuffer.getShort();
-        byte[] decimalBytes = new byte[len];
-        rowBuffer.get(decimalBytes);
-        tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
-      } else {
-        throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
-      }
-      outMeasures[idx] = tmpContent;
-    }
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index 0118e4d..f6fc3ca 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -160,49 +160,62 @@ public class SortStepRowHandler implements Serializable {
    * @return 3-parted row
    */
   public Object[] convertIntermediateSortTempRowTo3Parted(IntermediateSortTempRow sortTempRow) {
-    int[] dictDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-    byte[][] noDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt
-                                    + this.varcharDimCnt + this.complexDimCnt][];
-
-    int[] dictNoSortDims = new int[this.dictNoSortDimCnt];
-    byte[][] noDictNoSortAndVarcharComplexDims
-        = new byte[this.noDictNoSortDimCnt + this.varcharDimCnt + this.complexDimCnt][];
-    Object[] measures = new Object[this.measureCnt];
-
-    sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortAndVarcharComplexDims, measures,
-        this.dataTypes, this.varcharDimCnt, this.complexDimCnt);
+    Object[] out = new Object[3];
+    NonDictionaryUtil
+        .prepareOutObj(out, sortTempRow.getDictSortDims(), sortTempRow.getNoDictSortDims(),
+            sortTempRow.getMeasures());
+    return out;
+  }
 
-    // dict dims
-    System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims,
-        0, this.dictSortDimCnt);
-    System.arraycopy(dictNoSortDims, 0, dictDims,
-        this.dictSortDimCnt, this.dictNoSortDimCnt);;
+  /**
+   * Read intermediate sort temp row from InputStream.
+   * This method is used during the intermediate merge sort phase to read row from sort temp file.
+   *
+   * @param inputStream input stream
+   * @return a row that contains three parts
+   * @throws IOException if error occrus while reading from stream
+   */
+  public IntermediateSortTempRow readWithoutNoSortFieldConvert(
+      DataInputStream inputStream) throws IOException {
+    int[] dictSortDims = new int[this.dictSortDimCnt];
+    byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
 
-    // no dict dims, including complex
-    System.arraycopy(sortTempRow.getNoDictSortDims(), 0,
-        noDictArray, 0, this.noDictSortDimCnt);
-    System.arraycopy(noDictNoSortAndVarcharComplexDims, 0, noDictArray,
-        this.noDictSortDimCnt, this.noDictNoSortDimCnt + this.varcharDimCnt + this.complexDimCnt);
+    // read dict & sort dim data
+    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
+      dictSortDims[idx] = inputStream.readInt();
+    }
 
-    // measures are already here
+    // read no-dict & sort data
+    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
+      short len = inputStream.readShort();
+      byte[] bytes = new byte[len];
+      inputStream.readFully(bytes);
+      noDictSortDims[idx] = bytes;
+    }
 
-    Object[] holder = new Object[3];
-    NonDictionaryUtil.prepareOutObj(holder, dictDims, noDictArray, measures);
-    return holder;
+    // read no-dict dims & measures
+    int len = inputStream.readInt();
+    byte[] noSortDimsAndMeasures = new byte[len];
+    inputStream.readFully(noSortDimsAndMeasures);
+    // keeping no sort fields and measure in pack byte array as it will not participate in sort
+    return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
   }
 
   /**
    * Read intermediate sort temp row from InputStream.
-   * This method is used during the merge sort phase to read row from sort temp file.
+   * This method is used during the final merge sort phase to read row from sort temp file and
+   * merged sort temp file.
    *
    * @param inputStream input stream
    * @return a row that contains three parts
    * @throws IOException if error occrus while reading from stream
    */
-  public IntermediateSortTempRow readIntermediateSortTempRowFromInputStream(
+  public IntermediateSortTempRow readWithNoSortFieldConvert(
       DataInputStream inputStream) throws IOException {
-    int[] dictSortDims = new int[this.dictSortDimCnt];
-    byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
+    int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
+    byte[][] noDictSortDims =
+        new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+            + this.complexDimCnt][];
 
     // read dict & sort dim data
     for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
@@ -221,8 +234,80 @@ public class SortStepRowHandler implements Serializable {
     int len = inputStream.readInt();
     byte[] noSortDimsAndMeasures = new byte[len];
     inputStream.readFully(noSortDimsAndMeasures);
+    Object[] measure = new Object[this.measureCnt];
+    // unpack the no sort fields and measure fields
+    unpackNoSortFromBytes(noSortDimsAndMeasures, dictSortDims, noDictSortDims, measure);
+    return new IntermediateSortTempRow(dictSortDims, noDictSortDims,measure);
+  }
 
-    return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
+  private void unpackNoSortFromBytes(byte[] noSortDimsAndMeasures, int[] dictDims,
+      byte[][] noDictDims, Object[] measures) {
+    ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
+    // read dict_no_sort
+    for (int i = dictSortDimCnt; i < dictDims.length; i++) {
+      dictDims[i] = rowBuffer.getInt();
+    }
+
+    int noDictIndex = noDictSortDimCnt;
+    // read no_dict_no_sort
+    for (int i = 0; i < noDictNoSortDimCnt; i++) {
+      short len = rowBuffer.getShort();
+      byte[] bytes = new byte[len];
+      rowBuffer.get(bytes);
+      noDictDims[noDictIndex++] = bytes;
+    }
+
+    // read varchar dims
+    for (int i = 0; i < varcharDimCnt; i++) {
+      int len = rowBuffer.getInt();
+      byte[] bytes = new byte[len];
+      rowBuffer.get(bytes);
+      noDictDims[noDictIndex++] = bytes;
+    }
+
+    // read complex dims
+    for (int i = 0; i < complexDimCnt; i++) {
+      short len = rowBuffer.getShort();
+      byte[] bytes = new byte[len];
+      rowBuffer.get(bytes);
+      noDictDims[noDictIndex++] = bytes;
+    }
+
+    // read measure
+    int measureCnt = measures.length;
+    DataType tmpDataType;
+    Object tmpContent;
+    for (short idx = 0 ; idx < measureCnt; idx++) {
+      if ((byte) 0 == rowBuffer.get()) {
+        measures[idx] = null;
+        continue;
+      }
+
+      tmpDataType = dataTypes[idx];
+      if (DataTypes.BOOLEAN == tmpDataType) {
+        if ((byte) 1 == rowBuffer.get()) {
+          tmpContent = true;
+        } else {
+          tmpContent = false;
+        }
+      } else if (DataTypes.SHORT == tmpDataType) {
+        tmpContent = rowBuffer.getShort();
+      } else if (DataTypes.INT == tmpDataType) {
+        tmpContent = rowBuffer.getInt();
+      } else if (DataTypes.LONG == tmpDataType) {
+        tmpContent = rowBuffer.getLong();
+      } else if (DataTypes.DOUBLE == tmpDataType) {
+        tmpContent = rowBuffer.getDouble();
+      } else if (DataTypes.isDecimal(tmpDataType)) {
+        short len = rowBuffer.getShort();
+        byte[] decimalBytes = new byte[len];
+        rowBuffer.get(decimalBytes);
+        tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
+      } else {
+        throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
+      }
+      measures[idx] = tmpContent;
+    }
   }
 
   /**
@@ -298,7 +383,7 @@ public class SortStepRowHandler implements Serializable {
    * @param address address of the row
    * @return intermediate sort temp row
    */
-  public IntermediateSortTempRow readIntermediateSortTempRowFromUnsafeMemory(Object baseObject,
+  public IntermediateSortTempRow readFromMemoryWithoutNoSortFieldConvert(Object baseObject,
       long address) {
     int size = 0;
 
@@ -333,6 +418,51 @@ public class SortStepRowHandler implements Serializable {
   }
 
   /**
+   * Read intermediate sort temp row from unsafe memory.
+   * This method is used during merge sort phase for off-heap sort.
+   *
+   * @param baseObject base object of memory block
+   * @param address address of the row
+   * @return intermediate sort temp row
+   */
+  public IntermediateSortTempRow readRowFromMemoryWithNoSortFieldConvert(Object baseObject,
+      long address) {
+    int size = 0;
+
+    int[] dictSortDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
+    byte[][] noDictSortDims =
+        new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt
+            + this.complexDimCnt][];
+
+    // read dict & sort dim
+    for (int idx = 0; idx < dictSortDimCnt; idx++) {
+      dictSortDims[idx] = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+      size += 4;
+    }
+
+    // read no-dict & sort dim
+    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
+      short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
+      size += 2;
+      byte[] bytes = new byte[length];
+      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
+          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
+      size += length;
+      noDictSortDims[idx] = bytes;
+    }
+
+    // read no-sort dims & measures
+    int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
+    size += 4;
+    byte[] noSortDimsAndMeasures = new byte[len];
+    CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
+        noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
+    Object[] measures = new Object[measureCnt];
+    unpackNoSortFromBytes(noSortDimsAndMeasures, dictSortDims, noDictSortDims, measures);
+    return new IntermediateSortTempRow(dictSortDims, noDictSortDims, measures);
+  }
+
+  /**
    * Write intermediate sort temp row directly from unsafe memory to stream.
    * This method is used at the late beginning of the sort phase to write in-memory pages
    * to sort temp file. Comparing with reading intermediate sort temp row from memory and then
@@ -429,6 +559,8 @@ public class SortStepRowHandler implements Serializable {
     return size;
   }
 
+
+
   /**
    * Pack to no-sort fields to byte array
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 7ea5cb3..45cfa13 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -49,6 +49,7 @@ public class UnsafeCarbonRowPage {
 
   private TableFieldStat tableFieldStat;
   private SortStepRowHandler sortStepRowHandler;
+  private boolean convertNoSortFields;
 
   public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock,
       boolean saveToDisk, long taskId) {
@@ -88,8 +89,13 @@ public class UnsafeCarbonRowPage {
    * @return one row
    */
   public IntermediateSortTempRow getRow(long address) {
-    return sortStepRowHandler.readIntermediateSortTempRowFromUnsafeMemory(
-        dataBlock.getBaseObject(), address);
+    if (convertNoSortFields) {
+      return sortStepRowHandler
+          .readRowFromMemoryWithNoSortFieldConvert(dataBlock.getBaseObject(), address);
+    } else {
+      return sortStepRowHandler
+          .readFromMemoryWithoutNoSortFieldConvert(dataBlock.getBaseObject(), address);
+    }
   }
 
   /**
@@ -146,4 +152,8 @@ public class UnsafeCarbonRowPage {
   public enum MemoryManagerType {
     UNSAFE_MEMORY_MANAGER, UNSAFE_SORT_MEMORY_MANAGER
   }
+
+  public void setReadConvertedNoSortField() {
+    this.convertNoSortFields = true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index c966ff2..102b057 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -49,6 +49,9 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
     this.mergedAddresses = merger.getMergedAddresses();
     this.rowPageIndexes = merger.getRowPageIndexes();
     this.rowPages = merger.getUnsafeCarbonRowPages();
+    for (UnsafeCarbonRowPage rowPage: rowPages) {
+      rowPage.setReadConvertedNoSortField();
+    }
     LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
     this.comparator = new IntermediateSortTempRowComparator(noDictSortColumnMapping);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index 8b4b550..02ffd68 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -46,6 +46,7 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
     LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
     this.comparator = new IntermediateSortTempRowComparator(
         rowPage.getTableFieldStat().getIsSortColNoDictFlags());
+    this.rowPage.setReadConvertedNoSortField();
   }
 
   public boolean hasNext() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 86b7ac8..7c3c056 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -96,10 +96,12 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
   private TableFieldStat tableFieldStat;
   private SortStepRowHandler sortStepRowHandler;
   private Comparator<IntermediateSortTempRow> comparator;
+  private boolean convertNoSortFields;
   /**
    * Constructor to initialize
    */
-  public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters) {
+  public UnsafeSortTempFileChunkHolder(File tempFile, SortParameters parameters,
+      boolean convertNoSortFields) {
     // set temp file
     this.tempFile = tempFile;
     this.readBufferSize = parameters.getBufferSize();
@@ -108,6 +110,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     this.sortStepRowHandler = new SortStepRowHandler(tableFieldStat);
     this.executorService = Executors.newFixedThreadPool(1);
     comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn());
+    this.convertNoSortFields = convertNoSortFields;
     initialize();
   }
 
@@ -162,7 +165,11 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
       fillDataForPrefetch();
     } else {
       try {
-        this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+        if (convertNoSortFields) {
+          this.returnRow = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+        } else {
+          this.returnRow = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
+        }
         this.numberOfObjectRead++;
       } catch (IOException e) {
         throw new CarbonSortKeyAndGroupByException("Problems while reading row", e);
@@ -210,9 +217,11 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
       throws IOException {
     IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
     for (int i = 0; i < expected; i++) {
-      IntermediateSortTempRow holder
-          = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
-      holders[i] = holder;
+      if (convertNoSortFields) {
+        holders[i] = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+      } else {
+        holders[i] = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
+      }
     }
     this.numberOfObjectRead += expected;
     return holders;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
index c5b215e..0a12eda 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeIntermediateFileMerger.java
@@ -209,7 +209,8 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> {
 
     for (File tempFile : intermediateFiles) {
       // create chunk holder
-      sortTempFileChunkHolder = new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters);
+      sortTempFileChunkHolder =
+          new UnsafeSortTempFileChunkHolder(tempFile, mergerParameters, false);
 
       sortTempFileChunkHolder.readRow();
       this.totalNumberOfRecords += sortTempFileChunkHolder.numberOfRows();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index 6defeb7..2dd2f31 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -143,7 +143,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
       for (final File file : filesToMergeSort) {
 
         SortTempChunkHolder sortTempFileChunkHolder =
-            new UnsafeSortTempFileChunkHolder(file, parameters);
+            new UnsafeSortTempFileChunkHolder(file, parameters, true);
 
         // initialize
         sortTempFileChunkHolder.readRow();
@@ -197,8 +197,7 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec
    */
   public Object[] next() {
     if (hasNext()) {
-      IntermediateSortTempRow sortTempRow = getSortedRecordFromFile();
-      return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(sortTempRow);
+      return sortStepRowHandler.convertIntermediateSortTempRowTo3Parted(getSortedRecordFromFile());
     } else {
       throw new NoSuchElementException("No more elements to return");
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index 364515c..35563d0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -206,7 +206,8 @@ public class IntermediateFileMerger implements Callable<Void> {
     for (File tempFile : intermediateFiles) {
       // create chunk holder
       sortTempFileChunkHolder =
-          new SortTempFileChunkHolder(tempFile, mergerParameters, mergerParameters.getTableName());
+          new SortTempFileChunkHolder(tempFile, mergerParameters, mergerParameters.getTableName(),
+              false);
 
       // initialize
       sortTempFileChunkHolder.initialize();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
index 09c1920..646969a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SingleThreadFinalSortFilesMerger.java
@@ -176,7 +176,7 @@ public class SingleThreadFinalSortFilesMerger extends CarbonIterator<Object[]> {
         @Override public Void call() throws CarbonSortKeyAndGroupByException {
             // create chunk holder
             SortTempFileChunkHolder sortTempFileChunkHolder =
-                new SortTempFileChunkHolder(tempFile, sortParameters, tableName);
+                new SortTempFileChunkHolder(tempFile, sortParameters, tableName, true);
           try {
             // initialize
             sortTempFileChunkHolder.initialize();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f27efb3e/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 7e221a7..e39fe1d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -98,6 +98,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   private TableFieldStat tableFieldStat;
   private SortStepRowHandler sortStepRowHandler;
   private Comparator<IntermediateSortTempRow> comparator;
+  private boolean convertToActualField;
   /**
    * Constructor to initialize
    *
@@ -105,7 +106,8 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    * @param sortParameters
    * @param tableName
    */
-  public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName) {
+  public SortTempFileChunkHolder(File tempFile, SortParameters sortParameters, String tableName,
+      boolean convertToActualField) {
     // set temp file
     this.tempFile = tempFile;
     this.readBufferSize = sortParameters.getBufferSize();
@@ -116,6 +118,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
         tableFieldStat.getIsSortColNoDictFlags());
     this.executorService = Executors
         .newFixedThreadPool(1, new CarbonThreadFactory("SafeSortTempChunkHolderPool:" + tableName));
+    this.convertToActualField = convertToActualField;
   }
 
   /**
@@ -168,7 +171,11 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
       fillDataForPrefetch();
     } else {
       try {
-        this.returnRow = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
+        if (convertToActualField) {
+          this.returnRow = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+        } else {
+          this.returnRow = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
+        }
         this.numberOfObjectRead++;
       } catch (IOException e) {
         throw new CarbonSortKeyAndGroupByException("Problem while reading rows", e);
@@ -214,9 +221,11 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   private IntermediateSortTempRow[] readBatchedRowFromStream(int expected) throws IOException {
     IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
     for (int i = 0; i < expected; i++) {
-      IntermediateSortTempRow holder
-          = sortStepRowHandler.readIntermediateSortTempRowFromInputStream(stream);
-      holders[i] = holder;
+      if (convertToActualField) {
+        holders[i] = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+      } else {
+        holders[i] = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
+      }
     }
     this.numberOfObjectRead += expected;
     return holders;