You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/04/02 02:42:00 UTC

[carbondata] 40/41: [CARBONDATA-3335]Fixed load and compaction failure after alter done in older version

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 6a2c072f5031488b82f981f23b34623d214ef0d5
Author: kumarvishal09 <ku...@gmail.com>
AuthorDate: Fri Mar 29 06:32:04 2019 +0530

    [CARBONDATA-3335]Fixed load and compaction failure after alter done in older version
    
    No Sort Load/Compaction is failing in latest version with alter in older version This is because for sort step output is based on sort order and writer expect based on schema order, This PR handled the same by updating the sort output based on schema order and tested with 3.5 billion records performance is same
    
    This closes #3168
---
 .../carbondata/core/datastore/TableSpec.java       | 93 ++++++++++++++++------
 .../loading/row/IntermediateSortTempRow.java       |  8 ++
 .../loading/sort/SortStepRowHandler.java           |  7 +-
 .../loading/sort/unsafe/UnsafeCarbonRowPage.java   |  8 +-
 .../holder/UnsafeSortTempFileChunkHolder.java      | 16 +++-
 .../processing/sort/DummyRowUpdater.java           | 40 ++++++++++
 .../processing/sort/SchemaBasedRowUpdater.java     | 91 +++++++++++++++++++++
 .../processing/sort/SortTempRowUpdater.java        | 40 ++++++++++
 .../processing/sort/sortdata/SortParameters.java   | 54 +++++++++++++
 .../sort/sortdata/SortTempFileChunkHolder.java     | 15 +++-
 .../processing/sort/sortdata/TableFieldStat.java   | 16 ++++
 .../carbondata/processing/store/TablePage.java     | 49 ++----------
 12 files changed, 364 insertions(+), 73 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
index 002104a..d0b8b3c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -50,6 +50,12 @@ public class TableSpec {
 
   private CarbonTable carbonTable;
 
+  private boolean isUpdateDictDim;
+
+  private boolean isUpdateNoDictDims;
+  private int[] dictDimActualPosition;
+  private int[] noDictDimActualPosition;
+
   public TableSpec(CarbonTable carbonTable) {
     this.carbonTable = carbonTable;
     List<CarbonDimension> dimensions =
@@ -71,10 +77,12 @@ public class TableSpec {
   }
 
   private void addDimensions(List<CarbonDimension> dimensions) {
-    List<DimensionSpec> sortDimSpec = new ArrayList<>();
-    List<DimensionSpec> noSortDimSpec = new ArrayList<>();
+    List<DimensionSpec> dictSortDimSpec = new ArrayList<>();
+    List<DimensionSpec> noSortDictDimSpec = new ArrayList<>();
     List<DimensionSpec> noSortNoDictDimSpec = new ArrayList<>();
-    List<DimensionSpec> sortNoDictDimSpec = new ArrayList<>();
+    List<DimensionSpec> noDictSortDimSpec = new ArrayList<>();
+    List<DimensionSpec> dictDimensionSpec = new ArrayList<>();
+    int dimIndex = 0;
     DimensionSpec spec;
     short actualPosition = 0;
     // sort step's output is based on sort column order i.e sort columns data will be present
@@ -83,40 +91,61 @@ public class TableSpec {
       CarbonDimension dimension = dimensions.get(i);
       if (dimension.isComplex()) {
         spec = new DimensionSpec(ColumnType.COMPLEX, dimension, actualPosition++);
+        dimensionSpec[dimIndex++] = spec;
+        noDictionaryDimensionSpec.add(spec);
+        noSortNoDictDimSpec.add(spec);
       } else if (dimension.getDataType() == DataTypes.TIMESTAMP && !dimension
           .isDirectDictionaryEncoding()) {
         spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension, actualPosition++);
+        dimensionSpec[dimIndex++] = spec;
+        noDictionaryDimensionSpec.add(spec);
+        if (dimension.isSortColumn()) {
+          noDictSortDimSpec.add(spec);
+        } else {
+          noSortNoDictDimSpec.add(spec);
+        }
       } else if (dimension.isDirectDictionaryEncoding()) {
         spec = new DimensionSpec(ColumnType.DIRECT_DICTIONARY, dimension, actualPosition++);
+        dimensionSpec[dimIndex++] = spec;
+        dictDimensionSpec.add(spec);
+        if (dimension.isSortColumn()) {
+          dictSortDimSpec.add(spec);
+        } else {
+          noSortDictDimSpec.add(spec);
+        }
       } else if (dimension.isGlobalDictionaryEncoding()) {
         spec = new DimensionSpec(ColumnType.GLOBAL_DICTIONARY, dimension, actualPosition++);
-      } else {
-        spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension, actualPosition++);
-      }
-      if (dimension.isSortColumn()) {
-        sortDimSpec.add(spec);
-        if (!dimension.isDirectDictionaryEncoding() && !dimension.isGlobalDictionaryEncoding()
-            || spec.getColumnType() == ColumnType.COMPLEX) {
-          sortNoDictDimSpec.add(spec);
+        dimensionSpec[dimIndex++] = spec;
+        dictDimensionSpec.add(spec);
+        if (dimension.isSortColumn()) {
+          dictSortDimSpec.add(spec);
+        } else {
+          noSortDictDimSpec.add(spec);
         }
       } else {
-        noSortDimSpec.add(spec);
-        if (!dimension.isDirectDictionaryEncoding() && !dimension.isGlobalDictionaryEncoding()
-            || spec.getColumnType() == ColumnType.COMPLEX) {
+        spec = new DimensionSpec(ColumnType.PLAIN_VALUE, dimension, actualPosition++);
+        dimensionSpec[dimIndex++] = spec;
+        noDictionaryDimensionSpec.add(spec);
+        if (dimension.isSortColumn()) {
+          noDictSortDimSpec.add(spec);
+        } else {
           noSortNoDictDimSpec.add(spec);
         }
       }
     }
-    // combine the result
-    final DimensionSpec[] sortDimensionSpecs =
-        sortDimSpec.toArray(new DimensionSpec[sortDimSpec.size()]);
-    final DimensionSpec[] noSortDimensionSpecs =
-        noSortDimSpec.toArray(new DimensionSpec[noSortDimSpec.size()]);
-    System.arraycopy(sortDimensionSpecs, 0, dimensionSpec, 0, sortDimensionSpecs.length);
-    System.arraycopy(noSortDimensionSpecs, 0, dimensionSpec, sortDimensionSpecs.length,
-        noSortDimensionSpecs.length);
-    noDictionaryDimensionSpec.addAll(sortNoDictDimSpec);
-    noDictionaryDimensionSpec.addAll(noSortNoDictDimSpec);
+    noDictSortDimSpec.addAll(noSortNoDictDimSpec);
+    dictSortDimSpec.addAll(noSortDictDimSpec);
+
+    this.dictDimActualPosition = new int[dictSortDimSpec.size()];
+    this.noDictDimActualPosition = new int[noDictSortDimSpec.size()];
+    for (int i = 0; i < dictDimActualPosition.length; i++) {
+      dictDimActualPosition[i] = dictSortDimSpec.get(i).getActualPostion();
+    }
+    for (int i = 0; i < noDictDimActualPosition.length; i++) {
+      noDictDimActualPosition[i] = noDictSortDimSpec.get(i).getActualPostion();
+    }
+    isUpdateNoDictDims = !noDictSortDimSpec.equals(noDictionaryDimensionSpec);
+    isUpdateDictDim = !dictSortDimSpec.equals(dictDimensionSpec);
   }
 
   private void addMeasures(List<CarbonMeasure> measures) {
@@ -126,6 +155,22 @@ public class TableSpec {
     }
   }
 
+  public int[] getDictDimActualPosition() {
+    return dictDimActualPosition;
+  }
+
+  public int[] getNoDictDimActualPosition() {
+    return noDictDimActualPosition;
+  }
+
+  public boolean isUpdateDictDim() {
+    return isUpdateDictDim;
+  }
+
+  public boolean isUpdateNoDictDims() {
+    return isUpdateNoDictDims;
+  }
+
   /**
    * No dictionary and complex dimensions of the table
    *
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
index 844e45e..0207752 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -64,4 +64,12 @@ public class IntermediateSortTempRow {
   public byte[] getNoSortDimsAndMeasures() {
     return noSortDimsAndMeasures;
   }
+
+  public void setNoDictData(Object[] noDictSortDims) {
+    this.noDictSortDims = noDictSortDims;
+  }
+
+  public void setDictData(int[] dictData) {
+    this.dictSortDims = dictData;
+  }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index fa12dcc..8a0f8ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+import org.apache.carbondata.processing.sort.SortTempRowUpdater;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
 import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
@@ -78,6 +79,8 @@ public class SortStepRowHandler implements Serializable {
 
   private boolean[] noDictNoSortColMapping;
 
+  private SortTempRowUpdater sortTempRowUpdater;
+
   /**
    * constructor
    * @param tableFieldStat table field stat
@@ -108,6 +111,7 @@ public class SortStepRowHandler implements Serializable {
     for (int i = 0; i < noDictNoSortDataTypes.length; i++) {
       noDictNoSortColMapping[i] = DataTypeUtil.isPrimitiveColumn(noDictNoSortDataTypes[i]);
     }
+    this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
   }
 
   /**
@@ -167,8 +171,7 @@ public class SortStepRowHandler implements Serializable {
       for (int idx = 0; idx < this.measureCnt; idx++) {
         measures[idx] = row[this.measureIdx[idx]];
       }
-
-      NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures);
+      sortTempRowUpdater.updateOutputRow(holder, dictDims, nonDictArray, measures);
     } catch (Exception e) {
       throw new RuntimeException("Problem while converting row to 3 parts", e);
     }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
index 21403b0..6cf1a25 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/UnsafeCarbonRowPage.java
@@ -28,6 +28,7 @@ import org.apache.carbondata.core.memory.UnsafeSortMemoryManager;
 import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.processing.sort.SortTempRowUpdater;
 import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
 
 /**
@@ -50,6 +51,8 @@ public class UnsafeCarbonRowPage {
   private SortStepRowHandler sortStepRowHandler;
   private boolean convertNoSortFields;
 
+  private SortTempRowUpdater sortTempRowUpdater;
+
   public UnsafeCarbonRowPage(TableFieldStat tableFieldStat, MemoryBlock memoryBlock,
       String taskId) {
     this.tableFieldStat = tableFieldStat;
@@ -60,6 +63,7 @@ public class UnsafeCarbonRowPage {
     // TODO Only using 98% of space for safe side.May be we can have different logic.
     sizeToBeUsed = dataBlock.size() - (dataBlock.size() * 5) / 100;
     this.managerType = MemoryManagerType.UNSAFE_MEMORY_MANAGER;
+    this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
   }
 
   public int addRow(Object[] row,
@@ -93,8 +97,10 @@ public class UnsafeCarbonRowPage {
    */
   public IntermediateSortTempRow getRow(long address) {
     if (convertNoSortFields) {
-      return sortStepRowHandler
+      IntermediateSortTempRow intermediateSortTempRow = sortStepRowHandler
           .readRowFromMemoryWithNoSortFieldConvert(dataBlock.getBaseObject(), address);
+      this.sortTempRowUpdater.updateSortTempRow(intermediateSortTempRow);
+      return intermediateSortTempRow;
     } else {
       return sortStepRowHandler
           .readFromMemoryWithoutNoSortFieldConvert(dataBlock.getBaseObject(), address);
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
index 04cab70..7fcfc0e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.processing.sort.SortTempRowUpdater;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.IntermediateSortTempRowComparator;
 import org.apache.carbondata.processing.sort.sortdata.SortParameters;
@@ -98,6 +99,8 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
   private SortStepRowHandler sortStepRowHandler;
   private Comparator<IntermediateSortTempRow> comparator;
   private boolean convertNoSortFields;
+
+  private SortTempRowUpdater sortTempRowUpdater;
   /**
    * Constructor to initialize
    */
@@ -113,6 +116,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     comparator = new IntermediateSortTempRowComparator(parameters.getNoDictionarySortColumn(),
         parameters.getNoDictDataType());
     this.convertNoSortFields = convertNoSortFields;
+    this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
     initialize();
   }
 
@@ -168,7 +172,11 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     } else {
       try {
         if (convertNoSortFields) {
-          this.returnRow = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+          IntermediateSortTempRow intermediateSortTempRow =
+              sortStepRowHandler.readWithNoSortFieldConvert(stream);
+          sortTempRowUpdater
+              .updateSortTempRow(intermediateSortTempRow);
+          this.returnRow = intermediateSortTempRow;
         } else {
           this.returnRow = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
         }
@@ -220,7 +228,11 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder {
     IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
     for (int i = 0; i < expected; i++) {
       if (convertNoSortFields) {
-        holders[i] = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+        IntermediateSortTempRow intermediateSortTempRow =
+            sortStepRowHandler.readWithNoSortFieldConvert(stream);
+        sortTempRowUpdater
+            .updateSortTempRow(intermediateSortTempRow);
+        holders[i] = intermediateSortTempRow;
       } else {
         holders[i] = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
       }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/DummyRowUpdater.java b/processing/src/main/java/org/apache/carbondata/processing/sort/DummyRowUpdater.java
new file mode 100644
index 0000000..f86a89c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/DummyRowUpdater.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort;
+
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+
+/**
+ * This class will be used when order is not change so not need to update the row
+ */
+public class DummyRowUpdater implements SortTempRowUpdater {
+
+  private static final long serialVersionUID = 5989093890994039617L;
+
+  @Override public void updateSortTempRow(IntermediateSortTempRow intermediateSortTempRow) {
+    // DO NOTHING
+  }
+
+  @Override public void updateOutputRow(Object[] out, int[] dimArray, Object[] noDictArray,
+      Object[] measureArray) {
+    out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray;
+    out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = noDictArray;
+    out[WriteStepRowUtil.MEASURE] = measureArray;
+  }
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/SchemaBasedRowUpdater.java b/processing/src/main/java/org/apache/carbondata/processing/sort/SchemaBasedRowUpdater.java
new file mode 100644
index 0000000..2fca803
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/SchemaBasedRowUpdater.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort;
+
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+
+/**
+ * Below class will be used to update the sort output row based on schema order during filnal merge
+ * this is required because in case of older version(eg:1.1) alter add column was supported
+ * only with sort columns and sort step will return the data based on
+ * sort column order(sort columns first) so as writer step understand format based on schema order
+ * so we need to arrange based on schema order
+ */
+public class SchemaBasedRowUpdater implements SortTempRowUpdater {
+
+  private static final long serialVersionUID = -8864989617597611912L;
+
+  private boolean isUpdateDictDims;
+
+  private boolean isUpdateNonDictDims;
+
+  private int[] dictDimActualPosition;
+
+  private int[] noDictActualPosition;
+
+  public SchemaBasedRowUpdater(int[] dictDimActualPosition, int[] noDictActualPosition,
+      boolean isUpdateDictDims, boolean isUpdateNonDictDims) {
+    this.dictDimActualPosition = dictDimActualPosition;
+    this.noDictActualPosition = noDictActualPosition;
+    this.isUpdateDictDims = isUpdateDictDims;
+    this.isUpdateNonDictDims = isUpdateNonDictDims;
+  }
+
+  @Override public void updateSortTempRow(IntermediateSortTempRow intermediateSortTempRow) {
+    int[] dictSortDims = intermediateSortTempRow.getDictSortDims();
+    if (isUpdateDictDims) {
+      int[] dimArrayNew = new int[intermediateSortTempRow.getDictSortDims().length];
+      for (int i = 0; i < dictSortDims.length; i++) {
+        dimArrayNew[dictDimActualPosition[i]] = dictSortDims[i];
+      }
+      dictSortDims = dimArrayNew;
+    }
+    Object[] noDictSortDims = intermediateSortTempRow.getNoDictSortDims();
+    if (isUpdateNonDictDims) {
+      Object[] noDictArrayNew = new Object[noDictSortDims.length];
+      for (int i = 0; i < noDictArrayNew.length; i++) {
+        noDictArrayNew[noDictActualPosition[i]] = noDictSortDims[i];
+      }
+      noDictSortDims = noDictArrayNew;
+    }
+    intermediateSortTempRow.setDictData(dictSortDims);
+    intermediateSortTempRow.setNoDictData(noDictSortDims);
+  }
+
+  @Override public void updateOutputRow(Object[] out, int[] dimArray, Object[] noDictArray,
+      Object[] measureArray) {
+    if (isUpdateDictDims) {
+      int[] dimArrayNew = new int[dimArray.length];
+      for (int i = 0; i < dimArray.length; i++) {
+        dimArrayNew[dictDimActualPosition[i]] = dimArray[i];
+      }
+      dimArray = dimArrayNew;
+    }
+    if (isUpdateNonDictDims) {
+      Object[] noDictArrayNew = new Object[noDictArray.length];
+      for (int i = 0; i < noDictArrayNew.length; i++) {
+        noDictArrayNew[noDictActualPosition[i]] = noDictArray[i];
+      }
+      noDictArray = noDictArrayNew;
+    }
+    out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray;
+    out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = noDictArray;
+    out[WriteStepRowUtil.MEASURE] = measureArray;
+  }
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/SortTempRowUpdater.java b/processing/src/main/java/org/apache/carbondata/processing/sort/SortTempRowUpdater.java
new file mode 100644
index 0000000..7b3fd4b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/SortTempRowUpdater.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
+
+/**
+ * Below class will be used to update the sort output row based on schema order during filnal merge
+ * this is required because in case of older version(eg:1.1) alter add column was supported
+ * only with sort columns and sort step will return the data based on
+ * sort column order(sort columns first) so as writer step understand format based on schema order
+ * so we need to arrange based on schema order
+ */
+public interface SortTempRowUpdater extends Serializable {
+
+  /**
+   * @param intermediateSortTempRow
+   */
+  void updateSortTempRow(IntermediateSortTempRow intermediateSortTempRow);
+
+  void updateOutputRow(Object[] out, int[] dimArray,
+      Object[] noDictArray, Object[] measureArray);
+}
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 6fec8dc..ffc7416 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -144,6 +145,14 @@ public class SortParameters implements Serializable {
    */
   private CarbonTable carbonTable;
 
+  private boolean isUpdateDictDims;
+
+  private boolean isUpdateNonDictDims;
+
+  private int[] dictDimActualPosition;
+
+  private int[] noDictActualPosition;
+
   public SortParameters getCopy() {
     SortParameters parameters = new SortParameters();
     parameters.tempFileLocation = tempFileLocation;
@@ -178,6 +187,10 @@ public class SortParameters implements Serializable {
     parameters.batchSortSizeinMb = batchSortSizeinMb;
     parameters.rangeId = rangeId;
     parameters.carbonTable = carbonTable;
+    parameters.isUpdateDictDims = isUpdateDictDims;
+    parameters.isUpdateNonDictDims = isUpdateNonDictDims;
+    parameters.dictDimActualPosition = dictDimActualPosition;
+    parameters.noDictActualPosition = noDictActualPosition;
     return parameters;
   }
 
@@ -473,6 +486,10 @@ public class SortParameters implements Serializable {
         .getNoDictSortAndNoSortDataTypes(configuration.getTableSpec().getCarbonTable());
     parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
     parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
+    parameters.setNoDictActualPosition(configuration.getTableSpec().getNoDictDimActualPosition());
+    parameters.setDictDimActualPosition(configuration.getTableSpec().getDictDimActualPosition());
+    parameters.setUpdateDictDims(configuration.getTableSpec().isUpdateDictDim());
+    parameters.setUpdateNonDictDims(configuration.getTableSpec().isUpdateNoDictDims());
     return parameters;
   }
 
@@ -556,6 +573,11 @@ public class SortParameters implements Serializable {
     parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
     parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
         .getNoDictSortColMapping(parameters.getCarbonTable()));
+    TableSpec tableSpec = new TableSpec(carbonTable);
+    parameters.setNoDictActualPosition(tableSpec.getNoDictDimActualPosition());
+    parameters.setDictDimActualPosition(tableSpec.getDictDimActualPosition());
+    parameters.setUpdateDictDims(tableSpec.isUpdateDictDim());
+    parameters.setUpdateNonDictDims(tableSpec.isUpdateNoDictDims());
     return parameters;
   }
 
@@ -590,4 +612,36 @@ public class SortParameters implements Serializable {
   public void setSortColumn(boolean[] sortColumn) {
     this.sortColumn = sortColumn;
   }
+
+  public boolean isUpdateDictDims() {
+    return isUpdateDictDims;
+  }
+
+  public void setUpdateDictDims(boolean updateDictDims) {
+    isUpdateDictDims = updateDictDims;
+  }
+
+  public boolean isUpdateNonDictDims() {
+    return isUpdateNonDictDims;
+  }
+
+  public void setUpdateNonDictDims(boolean updateNonDictDims) {
+    isUpdateNonDictDims = updateNonDictDims;
+  }
+
+  public int[] getDictDimActualPosition() {
+    return dictDimActualPosition;
+  }
+
+  public void setDictDimActualPosition(int[] dictDimActualPosition) {
+    this.dictDimActualPosition = dictDimActualPosition;
+  }
+
+  public int[] getNoDictActualPosition() {
+    return noDictActualPosition;
+  }
+
+  public void setNoDictActualPosition(int[] noDictActualPosition) {
+    this.noDictActualPosition = noDictActualPosition;
+  }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
index 2ae90fa..9e9bac1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortTempFileChunkHolder.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
 import org.apache.carbondata.processing.loading.sort.SortStepRowHandler;
+import org.apache.carbondata.processing.sort.SortTempRowUpdater;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
 import org.apache.log4j.Logger;
@@ -46,6 +47,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
    */
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(SortTempFileChunkHolder.class.getName());
+  private SortTempRowUpdater sortTempRowUpdater;
 
   /**
    * temp file
@@ -106,6 +108,7 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     this.comparator =
         new IntermediateSortTempRowComparator(tableFieldStat.getIsSortColNoDictFlags(),
             tableFieldStat.getNoDictDataType());
+    this.sortTempRowUpdater = tableFieldStat.getSortTempRowUpdater();
   }
 
   /**
@@ -180,7 +183,11 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     } else {
       try {
         if (convertToActualField) {
-          this.returnRow = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+          IntermediateSortTempRow intermediateSortTempRow =
+              sortStepRowHandler.readWithNoSortFieldConvert(stream);
+          this.sortTempRowUpdater
+              .updateSortTempRow(intermediateSortTempRow);
+          this.returnRow = intermediateSortTempRow;
         } else {
           this.returnRow = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
         }
@@ -230,7 +237,11 @@ public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHold
     IntermediateSortTempRow[] holders = new IntermediateSortTempRow[expected];
     for (int i = 0; i < expected; i++) {
       if (convertToActualField) {
-        holders[i] = sortStepRowHandler.readWithNoSortFieldConvert(stream);
+        IntermediateSortTempRow intermediateSortTempRow =
+            sortStepRowHandler.readWithNoSortFieldConvert(stream);
+        this.sortTempRowUpdater
+            .updateSortTempRow(intermediateSortTempRow);
+        holders[i] = intermediateSortTempRow;
       } else {
         holders[i] = sortStepRowHandler.readWithoutNoSortFieldConvert(stream);
       }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index ef92bbc..9553bc9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -24,6 +24,9 @@ import java.util.Objects;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.processing.sort.DummyRowUpdater;
+import org.apache.carbondata.processing.sort.SchemaBasedRowUpdater;
+import org.apache.carbondata.processing.sort.SortTempRowUpdater;
 
 /**
  * This class is used to hold field information for a table during data loading. These information
@@ -66,6 +69,8 @@ public class TableFieldStat implements Serializable {
   // indices for measure columns
   private int[] measureIdx;
 
+  private SortTempRowUpdater sortTempRowUpdater;
+
   public TableFieldStat(SortParameters sortParameters) {
     int noDictDimCnt = sortParameters.getNoDictionaryCount();
     int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
@@ -141,6 +146,13 @@ public class TableFieldStat implements Serializable {
     for (int i = 0; i < measureCnt; i++) {
       measureIdx[i] = base + i;
     }
+    if (sortParameters.isUpdateDictDims() || sortParameters.isUpdateNonDictDims()) {
+      this.sortTempRowUpdater = new SchemaBasedRowUpdater(sortParameters.getDictDimActualPosition(),
+          sortParameters.getNoDictActualPosition(), sortParameters.isUpdateDictDims(),
+          sortParameters.isUpdateNonDictDims());
+    } else {
+      this.sortTempRowUpdater = new DummyRowUpdater();
+    }
   }
 
   public int getDictSortDimCnt() {
@@ -241,4 +253,8 @@ public class TableFieldStat implements Serializable {
   public DataType[] getNoDictDataType() {
     return noDictDataType;
   }
+
+  public SortTempRowUpdater getSortTempRowUpdater() {
+    return sortTempRowUpdater;
+  }
 }
\ No newline at end of file
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 5687549..7cc8932 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -22,6 +22,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -392,14 +393,12 @@ public class TablePage {
   private EncodedColumnPage[] encodeAndCompressDimensions()
       throws KeyGenException, IOException, MemoryException {
     List<EncodedColumnPage> encodedDimensions = new ArrayList<>();
-    EncodedColumnPage[][] complexColumnPages =
-        new EncodedColumnPage[complexDimensionPages.length][];
+    List<EncodedColumnPage> encodedComplexDimensions = new ArrayList<>();
     TableSpec tableSpec = model.getTableSpec();
     int dictIndex = 0;
     int noDictIndex = 0;
     int complexDimIndex = 0;
     int numDimensions = tableSpec.getNumDimensions();
-    int totalComplexColumnSize = 0;
     for (int i = 0; i < numDimensions; i++) {
       ColumnPageEncoder columnPageEncoder;
       EncodedColumnPage encodedPage;
@@ -435,51 +434,17 @@ public class TablePage {
           break;
         case COMPLEX:
           EncodedColumnPage[] encodedPages = ColumnPageEncoder.encodeComplexColumn(
-              complexDimensionPages[complexDimIndex]);
-          complexColumnPages[complexDimIndex] = encodedPages;
-          totalComplexColumnSize += encodedPages.length;
-          complexDimIndex++;
+              complexDimensionPages[complexDimIndex++]);
+          encodedComplexDimensions.addAll(Arrays.asList(encodedPages));
           break;
         default:
           throw new IllegalArgumentException("unsupported dimension type:" + spec
               .getColumnType());
       }
     }
-    // below code is to combine the list based on actual order present in carbon table
-    // in case of older version(eg:1.1) alter add column was supported only with sort columns
-    // and sort step will return the data based on sort column order(sort columns first)
-    // so arranging the column pages based on schema is required otherwise query will
-    // either give wrong result(for string columns) or throw exception in case of non string
-    // column as reading is based on schema order
-    int complexEncodedPageIndex = 0;
-    int normalEncodedPageIndex  = 0;
-    int currentPosition = 0;
-    EncodedColumnPage[] combinedList =
-        new EncodedColumnPage[encodedDimensions.size() + totalComplexColumnSize];
-    for (int i = 0; i < numDimensions; i++) {
-      TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i);
-      switch (spec.getColumnType()) {
-        case GLOBAL_DICTIONARY:
-        case DIRECT_DICTIONARY:
-        case PLAIN_VALUE:
-          // add the dimension based on actual postion
-          // current position is considered as complex column will have multiple children
-          combinedList[currentPosition + spec.getActualPostion()] =
-              encodedDimensions.get(normalEncodedPageIndex++);
-          break;
-        case COMPLEX:
-          EncodedColumnPage[] complexColumnPage = complexColumnPages[complexEncodedPageIndex++];
-          for (int j = 0; j < complexColumnPage.length; j++) {
-            combinedList[currentPosition + spec.getActualPostion() + j] = complexColumnPage[j];
-          }
-          // as for complex type 1 position is already considered, so subtract -1
-          currentPosition += complexColumnPage.length - 1;
-          break;
-        default:
-          throw new IllegalArgumentException("unsupported dimension type:" + spec.getColumnType());
-      }
-    }
-    return combinedList;
+
+    encodedDimensions.addAll(encodedComplexDimensions);
+    return encodedDimensions.toArray(new EncodedColumnPage[encodedDimensions.size()]);
   }
 
   /**