You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/11/02 13:47:31 UTC

[carbondata] branch master updated: [CARBONDATA-4043] Fix data load failure issue for columns added in legacy store

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d2171a7  [CARBONDATA-4043] Fix data load failure issue for columns added in legacy store
d2171a7 is described below

commit d2171a7baa0061892b7b6ff06b00424443a719d7
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Wed Oct 21 19:13:24 2020 +0530

    [CARBONDATA-4043] Fix data load failure issue for columns added in legacy store
    
    Why is this PR needed?
    When dimension is added in older versions like 1.1, by default it will be sort column.
    In sort step we assume data will be coming as sort column in the beginning.
    But the added column will be at last eventhough sort column. So, while building
    the dataload configurations for loading data, we rearrange the columns(dimensions
    and datafields) in order to bring the sort column to beginning and no-sort to last
    and revert them back to schema order before FinalMerge/DataWriter step.
    
    Issue:
    Data loading is failing because of castException in data writing step in case
    of NO_SORT and in final sort step in case of LOCAL_SORT.
    NO_SORT:
    In this flow, datafields are rearranged based on sort columns order, which is not required.
    LOCAL_SORT:
    During FinalMerge, we assume intermediate row data will have only sort column
    data in schema order. But intermediate sort row data can contain no-dictionary
    and dictionary data also. So, the information about the actual index of sort column data is insufficient.
    
    What changes were proposed in this PR?
    NO_SORT:
    Update datafields based on sort columns only if the table is a sort table and sort columns are configured.
    LOCAL_SORT:
    Collect the sort column info from all the dimensions into a map and identify the sort column index using it and compare only sort column data in final merge.
    
    This closes #3995
---
 .../query/SecondaryIndexQueryResultProcessor.java  |   2 +
 .../processing/loading/DataLoadProcessBuilder.java |   7 +-
 .../unsafe/holder/UnsafeFinalMergePageHolder.java  |   4 +-
 .../sort/unsafe/holder/UnsafeInmemoryHolder.java   |   5 +-
 .../holder/UnsafeSortTempFileChunkHolder.java      |   5 +-
 .../steps/DataConverterProcessorStepImpl.java      |   4 +-
 .../merger/CompactionResultSortProcessor.java      |   2 +
 .../sort/sortdata/FileMergeSortComparator.java     |  91 ++++++++++-----
 .../processing/sort/sortdata/SortParameters.java   |  38 ++++++-
 .../sort/sortdata/SortTempFileChunkHolder.java     |  11 +-
 .../processing/sort/sortdata/TableFieldStat.java   |  14 +++
 .../processing/util/CarbonDataProcessorUtil.java   |  52 ++++++++-
 .../sort/sortdata/FileMergeSortComparatorTest.java | 125 +++++++++++++++++++++
 13 files changed, 313 insertions(+), 47 deletions(-)

diff --git a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
index 41a5c43..4d045f0 100644
--- a/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
+++ b/integration/spark/src/main/java/org/apache/spark/sql/secondaryindex/query/SecondaryIndexQueryResultProcessor.java
@@ -528,6 +528,8 @@ public class SecondaryIndexQueryResultProcessor {
         CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     sortParameters.setNoDictionarySortColumn(
         CarbonDataProcessorUtil.getNoDictSortColMapping(indexTable));
+    sortParameters.setSortColumnSchemaOrderMap(
+        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(indexTable));
     finalMerger = new SingleThreadFinalSortFilesMerger(sortTempFileLocation,
         indexTable.getTableName(), sortParameters);
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 7935a5a..f979f2b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -246,19 +246,22 @@ public final class DataLoadProcessBuilder {
     List<DataField> dataFields = new ArrayList<>();
     List<DataField> complexDataFields = new ArrayList<>();
     List<DataField> partitionColumns = new ArrayList<>();
+    configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
     if (loadModel.isLoadWithoutConverterWithoutReArrangeStep()) {
       // To avoid, reArranging of the data for each row, re arrange the schema itself.
       getReArrangedDataFields(loadModel, carbonTable, dimensions, measures, complexDataFields,
           partitionColumns, dataFields);
     } else {
       getDataFields(loadModel, dimensions, measures, complexDataFields, dataFields);
-      dataFields = updateDataFieldsBasedOnSortColumns(dataFields);
+      if (!(!configuration.isSortTable() || SortScopeOptions.getSortScope(loadModel.getSortScope())
+          .equals(SortScopeOptions.SortScope.NO_SORT))) {
+        dataFields = updateDataFieldsBasedOnSortColumns(dataFields);
+      }
     }
     configuration.setDataFields(dataFields.toArray(new DataField[0]));
     configuration.setBucketingInfo(carbonTable.getBucketingInfo());
     configuration.setBucketHashMethod(carbonTable.getBucketHashMethod());
     configuration.setPreFetch(loadModel.isPreFetch());
-    configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
     configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
     configuration.setDataWritePath(loadModel.getDataWritePath());
     setSortColumnInfo(carbonTable, loadModel, configuration);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
index 4bf100c..717bb91 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeFinalMergePageHolder.java
@@ -62,7 +62,9 @@ public class UnsafeFinalMergePageHolder implements SortTempChunkHolder {
     this.noDictDataType = rowPages[0].getTableFieldStat().getNoDictDataType();
     LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
     this.comparator = new FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-        tableFieldStat.getNoDictDataType(), tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
+        tableFieldStat.getNoDictSchemaDataType(),
+        tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
+        tableFieldStat.getSortColSchemaOrderMap());
   }
 
   public boolean hasNext() {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
index 81fa661..a46811f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeInmemoryHolder.java
@@ -49,8 +49,9 @@ public class UnsafeInmemoryHolder implements SortTempChunkHolder {
     LOGGER.info("Processing unsafe inmemory rows page with size : " + actualSize);
     this.comparator =
         new FileMergeSortComparator(rowPage.getTableFieldStat().getIsSortColNoDictFlags(),
-            rowPage.getTableFieldStat().getNoDictDataType(),
-            rowPage.getTableFieldStat().getNoDictSortColumnSchemaOrderMapping());
+            rowPage.getTableFieldStat().getNoDictSchemaDataType(),
+            rowPage.getTableFieldStat().getNoDictSortColumnSchemaOrderMapping(),
+            rowPage.getTableFieldStat().getSortColSchemaOrderMap());
     this.rowPage.setReadConvertedNoSortField();
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 47f5d2f..5144f57 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -121,8 +121,9 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
           parameters.getNoDictDataType());
     } else {
       this.comparator = new FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-          tableFieldStat.getNoDictDataType(),
-          tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
+          tableFieldStat.getNoDictSchemaDataType(),
+          tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
+          tableFieldStat.getSortColSchemaOrderMap());
     }
     initialize();
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index f82b715..d3b3d5a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -152,13 +152,13 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     Arrays.sort(convertedSortColumnRanges,
         new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
             sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
-            .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable())));
+            .getNoDictSortDataTypes(configuration.getTableSpec().getCarbonTable())));
 
     // range partitioner to dispatch rows by sort columns
     this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges,
         new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
             sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
-            .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable())));
+            .getNoDictSortDataTypes(configuration.getTableSpec().getCarbonTable())));
   }
 
   // only convert sort column fields
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 8c43631..9cfda1d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -480,6 +480,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil
         .getNoDictSortColMapping(carbonTable);
     sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
+    sortParameters.setSortColumnSchemaOrderMap(
+        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(carbonTable));
     String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
         CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     finalMerger =
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
index a75af67..687af7b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparator.java
@@ -18,6 +18,8 @@
 package org.apache.carbondata.processing.sort.sortdata;
 
 import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil;
@@ -29,7 +31,10 @@ public class FileMergeSortComparator implements Comparator<IntermediateSortTempR
 
   private boolean[] isSortColumnNoDictionary;
 
-  private DataType[] noDicSortDataTypes;
+  /**
+   * Datatype of all the no-dictionary columns in the table in schema order
+   */
+  private DataType[] noDictDataTypes;
 
   /**
    * Index of the no dict Sort columns in the carbonRow for final merge step of sorting.
@@ -37,17 +42,35 @@ public class FileMergeSortComparator implements Comparator<IntermediateSortTempR
   private int[] noDictPrimitiveIndex;
 
   /**
+   * Sort and Dictionary info of all the columns in schema order
+   */
+  private final Map<Integer, List<Boolean>> sortColumnSchemaOrderMap;
+
+  /**
    * Comparator for IntermediateSortTempRow for compatibility cases where column added in old
    * version and it is sort column
    * @param isSortColumnNoDictionary isSortColumnNoDictionary
    */
-  public FileMergeSortComparator(boolean[] isSortColumnNoDictionary, DataType[] noDicSortDataTypes,
-      int[] columnIdBasedOnSchemaInRow) {
+  public FileMergeSortComparator(boolean[] isSortColumnNoDictionary, DataType[] noDictDataTypes,
+      int[] columnIdBasedOnSchemaInRow, Map<Integer, List<Boolean>> sortColumnSchemaOrderMap) {
     this.isSortColumnNoDictionary = isSortColumnNoDictionary;
-    this.noDicSortDataTypes = noDicSortDataTypes;
+    this.noDictDataTypes = noDictDataTypes;
     this.noDictPrimitiveIndex = columnIdBasedOnSchemaInRow;
+    this.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
   }
 
+  /**
+   * In the final merging, intermediate sortTemp row will have all the data in schema order.
+   * IntermediateSortTempRow#getNoDictSortDims() can return the following data:
+   * 1. only no-Dictionary sort column data
+   * 2. no-Dictionary sort and no-Dictionary no-sort column data
+   * IntermediateSortTempRow#getDictSortDims() can return the following data:
+   * 1. only Dictionary sort column data
+   * 2. dictionary sort and dictionary no-sort column data
+   * On this temp data row, we have to identify the sort column data and only compare those.
+   * From the sortColumnSchemaOrderMap, get the sort column and dict info. If the column
+   * is sort column, then perform the comparison, else increment the respective index.
+   */
   @Override
   public int compare(IntermediateSortTempRow rowA, IntermediateSortTempRow rowB) {
     int diff = 0;
@@ -55,41 +78,49 @@ public class FileMergeSortComparator implements Comparator<IntermediateSortTempR
     int nonDictIndex = 0;
     int noDicTypeIdx = 0;
     int schemaRowIdx = 0;
+    int sortIndex = 0;
 
-    for (boolean isNoDictionary : isSortColumnNoDictionary) {
+    for (Map.Entry<Integer, List<Boolean>> schemaEntry : sortColumnSchemaOrderMap.entrySet()) {
+      boolean isSortColumn = schemaEntry.getValue().get(0);
+      boolean isDictColumn = schemaEntry.getValue().get(1);
+      if (isSortColumn) {
+        if (isSortColumnNoDictionary[sortIndex++]) {
+          if (DataTypeUtil.isPrimitiveColumn(noDictDataTypes[noDicTypeIdx])) {
+            // use data types based comparator for the no dictionary measure columns
+            SerializableComparator comparator =
+                org.apache.carbondata.core.util.comparator.Comparator
+                    .getComparator(noDictDataTypes[noDicTypeIdx]);
+            int difference = comparator
+                .compare(rowA.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]],
+                    rowB.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]]);
+            schemaRowIdx++;
+            if (difference != 0) {
+              return difference;
+            }
+          } else {
+            byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex];
+            byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex];
 
-      if (isNoDictionary) {
-        if (DataTypeUtil.isPrimitiveColumn(noDicSortDataTypes[noDicTypeIdx])) {
-          // use data types based comparator for the no dictionary measure columns
-          SerializableComparator comparator = org.apache.carbondata.core.util.comparator.Comparator
-              .getComparator(noDicSortDataTypes[noDicTypeIdx]);
-          int difference = comparator
-              .compare(rowA.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]],
-                  rowB.getNoDictSortDims()[noDictPrimitiveIndex[schemaRowIdx]]);
-          schemaRowIdx++;
-          if (difference != 0) {
-            return difference;
+            int difference = ByteUtil.UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+            if (difference != 0) {
+              return difference;
+            }
           }
         } else {
-          byte[] byteArr1 = (byte[]) rowA.getNoDictSortDims()[nonDictIndex];
-          byte[] byteArr2 = (byte[]) rowB.getNoDictSortDims()[nonDictIndex];
+          int dimFieldA = rowA.getDictSortDims()[dictIndex];
+          int dimFieldB = rowB.getDictSortDims()[dictIndex];
 
-          int difference = ByteUtil.UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
-          if (difference != 0) {
-            return difference;
+          diff = dimFieldA - dimFieldB;
+          if (diff != 0) {
+            return diff;
           }
         }
+      }
+      if (isDictColumn) {
+        dictIndex++;
+      } else {
         nonDictIndex++;
         noDicTypeIdx++;
-      } else {
-        int dimFieldA = rowA.getDictSortDims()[dictIndex];
-        int dimFieldB = rowB.getDictSortDims()[dictIndex];
-        dictIndex++;
-
-        diff = dimFieldA - dimFieldB;
-        if (diff != 0) {
-          return diff;
-        }
       }
     }
     return diff;
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 9d41854..fc93d52 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.processing.sort.sortdata;
 
 import java.io.File;
 import java.io.Serializable;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -103,6 +104,13 @@ public class SortParameters implements Serializable {
   // used while writing the row to sort temp file where nosort nodict columns are handled seperately
   private DataType[] noDictNoSortDataType;
 
+  // no dictionary columns in schema order participating in sort
+  // used while performing final sort of intermediate files
+  private DataType[] noDictSchemaDataType;
+
+  // Sort and Dictionary info of all the columns in schema order, used in final sort
+  private Map<Integer, List<Boolean>> sortColumnSchemaOrderMap;
+
   /**
    * To know how many columns are of high cardinality.
    */
@@ -200,6 +208,8 @@ public class SortParameters implements Serializable {
     parameters.noDictActualPosition = noDictActualPosition;
     parameters.noDictSortColumnSchemaOrderMapping = noDictSortColumnSchemaOrderMapping;
     parameters.isInsertWithoutReArrangeFlow = isInsertWithoutReArrangeFlow;
+    parameters.noDictSchemaDataType = noDictSchemaDataType;
+    parameters.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
     return parameters;
   }
 
@@ -519,7 +529,11 @@ public class SortParameters implements Serializable {
           .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
       parameters.setMeasureDataType(configuration.getMeasureDataType());
       parameters.setNoDictDataType(CarbonDataProcessorUtil
-          .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable()));
+          .getNoDictSortDataTypes(configuration.getTableSpec().getCarbonTable()));
+      parameters.setSortColumnSchemaOrderMap(
+          CarbonDataProcessorUtil.getSortColSchemaOrderMapping(parameters.carbonTable));
+      parameters.setNoDictSchemaDataType(
+          CarbonDataProcessorUtil.getNoDictDataTypes(parameters.carbonTable));
       Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
           .getNoDictSortAndNoSortDataTypes(configuration.getTableSpec().getCarbonTable());
       parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
@@ -606,13 +620,17 @@ public class SortParameters implements Serializable {
         .getMeasureDataType(parameters.getMeasureColCount(), parameters.getCarbonTable());
     parameters.setMeasureDataType(type);
     parameters.setNoDictDataType(CarbonDataProcessorUtil
-        .getNoDictDataTypes(carbonTable));
+        .getNoDictSortDataTypes(carbonTable));
     Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
         .getNoDictSortAndNoSortDataTypes(parameters.getCarbonTable());
     parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
     parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
     parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
         .getNoDictSortColMapping(parameters.getCarbonTable()));
+    parameters.setSortColumnSchemaOrderMap(
+        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(parameters.carbonTable));
+    parameters.setNoDictSchemaDataType(
+        CarbonDataProcessorUtil.getNoDictDataTypes(parameters.carbonTable));
     parameters.setNoDictSortColumnSchemaOrderMapping(CarbonDataProcessorUtil
         .getColumnIdxBasedOnSchemaInRow(parameters.getCarbonTable()));
     TableSpec tableSpec = new TableSpec(carbonTable, false);
@@ -686,4 +704,20 @@ public class SortParameters implements Serializable {
   public void setNoDictActualPosition(int[] noDictActualPosition) {
     this.noDictActualPosition = noDictActualPosition;
   }
+
+  public DataType[] getNoDictSchemaDataType() {
+    return noDictSchemaDataType;
+  }
+
+  public void setNoDictSchemaDataType(DataType[] noDictSchemaDataType) {
+    this.noDictSchemaDataType = noDictSchemaDataType;
+  }
+
+  public void setSortColumnSchemaOrderMap(Map<Integer, List<Boolean>> sortColumnSchemaOrderMap) {
+    this.sortColumnSchemaOrderMap = sortColumnSchemaOrderMap;
+  }
+
+  public Map<Integer, List<Boolean>> getSortColumnSchemaOrderMap() {
+    return sortColumnSchemaOrderMap;
+  }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 4e3bcff..84fb4d0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -106,7 +106,9 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
   public SortTempFileChunkHolder(SortParameters sortParameters) {
     this.tableFieldStat = new TableFieldStat(sortParameters);
     this.comparator = new FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
-        tableFieldStat.getNoDictDataType(), tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
+        tableFieldStat.getNoDictSchemaDataType(),
+        tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
+        tableFieldStat.getSortColSchemaOrderMap());
     this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
   }
 
@@ -130,9 +132,10 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
                 true));
     this.convertToActualField = convertToActualField;
     if (this.convertToActualField) {
-      this.comparator = new FileMergeSortComparator(
-          tableFieldStat.getIsSortColNoDictFlags(), tableFieldStat.getNoDictDataType(),
-          tableFieldStat.getNoDictSortColumnSchemaOrderMapping());
+      this.comparator = new FileMergeSortComparator(tableFieldStat.getIsSortColNoDictFlags(),
+          tableFieldStat.getNoDictSchemaDataType(),
+          tableFieldStat.getNoDictSortColumnSchemaOrderMapping(),
+          tableFieldStat.getSortColSchemaOrderMap());
     } else {
       this.comparator =
           new IntermediateSortTempRowComparator(tableFieldStat.getIsSortColNoDictFlags(),
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 3ffc591..df7b873 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.processing.sort.sortdata;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -80,6 +81,10 @@ public class TableFieldStat implements Serializable {
    */
   private int[] noDictSortColumnSchemaOrderMapping;
 
+  private DataType[] noDictSchemaDataType;
+
+  private Map<Integer, List<Boolean>> sortColSchemaOrderMap;
+
   public TableFieldStat(SortParameters sortParameters) {
     int noDictDimCnt = sortParameters.getNoDictionaryCount();
     int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
@@ -100,6 +105,8 @@ public class TableFieldStat implements Serializable {
     this.noDictDataType = sortParameters.getNoDictDataType();
     this.noDictSortDataType = sortParameters.getNoDictSortDataType();
     this.noDictNoSortDataType = sortParameters.getNoDictNoSortDataType();
+    this.noDictSchemaDataType = sortParameters.getNoDictSchemaDataType();
+    this.sortColSchemaOrderMap = sortParameters.getSortColumnSchemaOrderMap();
     for (boolean flag : isVarcharDimFlags) {
       if (flag) {
         varcharDimCnt++;
@@ -352,4 +359,11 @@ public class TableFieldStat implements Serializable {
     return otherCols;
   }
 
+  public Map<Integer, List<Boolean>> getSortColSchemaOrderMap() {
+    return sortColSchemaOrderMap;
+  }
+
+  public DataType[] getNoDictSchemaDataType() {
+    return noDictSchemaDataType;
+  }
 }
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 1037455..ceaa05c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.constants.SortScopeOptions;
 import org.apache.carbondata.core.metadata.DatabaseLocationProvider;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -363,12 +364,12 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
-   * Get the no dictionary data types on the table
+   * Get the no dictionary sort data types on the table
    *
    * @param carbonTable
    * @return
    */
-  public static DataType[] getNoDictDataTypes(CarbonTable carbonTable) {
+  public static DataType[] getNoDictSortDataTypes(CarbonTable carbonTable) {
     List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
     List<DataType> type = new ArrayList<>();
     for (int i = 0; i < dimensions.size(); i++) {
@@ -380,6 +381,21 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * Get all the no dictionary data types on the table
+   */
+  public static DataType[] getNoDictDataTypes(CarbonTable carbonTable) {
+    List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
+    List<DataType> type = new ArrayList<>();
+    for (CarbonDimension dimension : dimensions) {
+      if (!dimension.hasEncoding(Encoding.DICTIONARY)
+          && dimension.getDataType() != DataTypes.DATE) {
+        type.add(dimension.getDataType());
+      }
+    }
+    return type.toArray(new DataType[type.size()]);
+  }
+
+  /**
    * get visible no dictionary dimensions as per data field order
    *
    * @param dataFields
@@ -425,6 +441,38 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * Get the sort/no_sort column map based on schema order.
+   * This will be used in the final sort step to find the index of sort column, to compare the
+   * intermediate row data based on schema.
+   */
+  public static Map<Integer, List<Boolean>> getSortColSchemaOrderMapping(CarbonTable carbonTable) {
+    List<CarbonDimension> dimensions = carbonTable.getVisibleDimensions();
+    Map<Integer, List<Boolean>> sortColSchemaOrderMap = new HashMap<>();
+    for (CarbonDimension dimension : dimensions) {
+      List<Boolean> sortDictOrNoDictMap = new ArrayList<>();
+      // check if the dimension is sort column or not and add to first index of sortDictOrNoDictMap
+      // check if the dimension is dict column or not and add to second index of sortDictOrNoDictMap
+      if (dimension.isSortColumn()) {
+        sortDictOrNoDictMap.add(true);
+        if (dimension.hasEncoding(Encoding.DICTIONARY)) {
+          sortDictOrNoDictMap.add(true);
+        } else {
+          sortDictOrNoDictMap.add(false);
+        }
+      } else {
+        sortDictOrNoDictMap.add(false);
+        if (dimension.hasEncoding(Encoding.DICTIONARY)) {
+          sortDictOrNoDictMap.add(true);
+        } else {
+          sortDictOrNoDictMap.add(false);
+        }
+      }
+      sortColSchemaOrderMap.put(dimension.getOrdinal(), sortDictOrNoDictMap);
+    }
+    return sortColSchemaOrderMap;
+  }
+
+  /**
    * get mapping based on data fields order
    *
    * @param dataFields
diff --git a/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java b/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
new file mode 100644
index 0000000..46c7e7b
--- /dev/null
+++ b/processing/src/test/java/org/apache/carbondata/processing/sort/sortdata/FileMergeSortComparatorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.TableSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.junit.Test;
+
+public class FileMergeSortComparatorTest {
+
+  static ColumnSchema getDimensionColumn(String colName, Boolean isSortColumn, Boolean isDictColumn,
+      DataType dataType) {
+    ColumnSchema dimColumn = new ColumnSchema();
+    dimColumn.setColumnName(colName);
+    dimColumn.setDataType(dataType);
+    dimColumn.setColumnUniqueId(UUID.randomUUID().toString());
+    dimColumn.setDimensionColumn(true);
+    List<Encoding> encodeList =
+        new ArrayList<Encoding>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    if (isDictColumn) {
+      encodeList.add(Encoding.DICTIONARY);
+    }
+    dimColumn.setEncodingList(encodeList);
+    if (isSortColumn) {
+      dimColumn.setSortColumn(true);
+    }
+    return dimColumn;
+  }
+
+  static TableSchema getTableSchema() {
+    TableSchema tableSchema = new TableSchema();
+    List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
+      columnSchemaList.add(getDimensionColumn("col1", false, false, DataTypes.INT));
+    columnSchemaList.add(getDimensionColumn("col2", true, false, DataTypes.STRING));
+    columnSchemaList.add(getDimensionColumn("col3", true, false, DataTypes.LONG));
+    columnSchemaList.add(getDimensionColumn("col4", true, true, DataTypes.STRING));
+    tableSchema.setListOfColumns(columnSchemaList);
+    tableSchema.setTableId(UUID.randomUUID().toString());
+    tableSchema.setTableName("carbonTestTable");
+    return tableSchema;
+  }
+
+  static private TableInfo getTableInfo() {
+    TableInfo info = new TableInfo();
+    info.setDatabaseName("carbonTestDatabase");
+    info.setTableUniqueName("carbonTestDatabase_carbonTestTable");
+    info.setFactTable(getTableSchema());
+    info.setTablePath("testore");
+    return info;
+  }
+
+  @Test
+  public void testFileMergeSortComparator() {
+    CarbonTable carbonTable = CarbonTable.buildFromTableInfo(getTableInfo());
+    // test get noDictDataTypes
+    DataType[] noDictDataTypes = CarbonDataProcessorUtil.getNoDictDataTypes(carbonTable);
+    assert (noDictDataTypes.length == 3 && noDictDataTypes[0].equals(DataTypes.INT)
+        && noDictDataTypes[1].equals(DataTypes.STRING) && noDictDataTypes[2]
+        .equals(DataTypes.LONG));
+
+    // test getSortColSchemaOrderMapping
+    Map<Integer, List<Boolean>> sortColSchemaOrderMapping =
+        CarbonDataProcessorUtil.getSortColSchemaOrderMapping(carbonTable);
+    assert (sortColSchemaOrderMapping.get(0).get(0).equals(false) && sortColSchemaOrderMapping
+        .get(0).get(1).equals(false));
+    assert (sortColSchemaOrderMapping.get(1).get(0).equals(true) && sortColSchemaOrderMapping.get(1)
+        .get(1).equals(false));
+    assert (sortColSchemaOrderMapping.get(2).get(0).equals(true) && sortColSchemaOrderMapping
+        .get(2).get(1).equals(false));
+    assert (sortColSchemaOrderMapping.get(3).get(0).equals(true) && sortColSchemaOrderMapping.get(3)
+        .get(1).equals(true));
+
+    // test comparator
+    boolean[] isSortColNoDict = { true, false };
+    int[] columnIdxBasedOnSchemaInRow =
+        CarbonDataProcessorUtil.getColumnIdxBasedOnSchemaInRow(carbonTable);
+    FileMergeSortComparator comparator =
+        new FileMergeSortComparator(isSortColNoDict, noDictDataTypes, columnIdxBasedOnSchemaInRow,
+            sortColSchemaOrderMapping);
+
+    // prepare data for final sort
+    int[] dictSortDims1 = { 1 };
+    Object[] noDictSortDims1 = { 1, new byte[] { 98, 99, 104 }, 2 };
+    byte[] noSortDimsAndMeasures1 = {};
+    IntermediateSortTempRow row1 =
+        new IntermediateSortTempRow(dictSortDims1, noDictSortDims1, noSortDimsAndMeasures1);
+
+    int[] dictSortDims = { 1 };
+    Object[] noDictSortDims = { 2, new byte[] { 98, 99, 100 }, 1 };
+    byte[] noSortDimsAndMeasures = {};
+    IntermediateSortTempRow row2 =
+        new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
+
+    assert (comparator.compare(row1, row2) > 0);
+  }
+
+}