You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2020/02/25 15:29:56 UTC

[carbondata] branch master updated: [CARBONDATA-3637] Use optimized insert flow for MV and insert stage command

This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b7facc  [CARBONDATA-3637] Use optimized insert flow for MV and insert stage command
6b7facc is described below

commit 6b7faccde15371e316900bf2e3552c653b0c05e3
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Sat Feb 15 08:42:42 2020 +0530

    [CARBONDATA-3637] Use optimized insert flow for MV and insert stage command
    
    Why is this PR needed?
    MV and insert stage can use the new optimized insert into flow.
    
    Also In the new insert into flow, found one issue with the partition column. Fixed it.
    
    If the catalog table schema is already rearranged, don't rearrange again.
    Timestamp converter was not handled for 0 value. Need to set it to Null when it is 0.
    While deriving the index for the convert to 3 step for new insert flow, order was based on internal partition order, instead of internal order.
    
    What changes were proposed in this PR?
    
    changed MV and insert stage command to use optimized insert flow.
    
    After this changes,
    b. CarbonInsertIntoCommand -- insert DML, CTAS DML, MV, insert stage command.
    c. CarbonInsertIntoWithDf -- old flow which supports bad record handling with converter step method that process update, compaction, df writer, alter table scenarios [some problem in rearranging now]
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3615
---
 .../management/CarbonInsertFromStageCommand.scala  | 10 ++-
 .../management/CarbonInsertIntoCommand.scala       | 13 +++-
 .../command/management/CommonLoadUtils.scala       | 10 ++-
 .../table/CarbonCreateTableAsSelectCommand.scala   | 10 +--
 .../mv/extension/MVDataMapProvider.scala           | 14 ++--
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   |  3 +
 .../timeseries/TestMVTimeSeriesLoadAndQuery.scala  | 13 ++++
 .../loading/CarbonDataLoadConfiguration.java       | 40 +++++------
 .../CarbonRowDataWriterProcessorStepImpl.java      | 78 +++++-----------------
 .../carbondata/processing/store/TablePage.java     |  8 ++-
 10 files changed, 89 insertions(+), 110 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index b44cfe5..64f802f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -321,18 +321,16 @@ case class CarbonInsertFromStageCommand(
         val header = columns.mkString(",")
         val selectColumns = columns.filter(!partition.contains(_))
         val selectedDataFrame = dataFrame.select(selectColumns.head, selectColumns.tail: _*)
-        CarbonInsertIntoWithDf(
+        CarbonInsertIntoCommand(
           databaseNameOp = Option(table.getDatabaseName),
           tableName = table.getTableName,
           options = scala.collection.immutable.Map("fileheader" -> header,
             "binary_decoder" -> "base64"),
           isOverwriteTable = false,
-          dataFrame = selectedDataFrame,
-          updateModel = None,
-          tableInfoOp = None,
-          internalOptions = Map.empty,
+          logicalPlan = selectedDataFrame.queryExecution.analyzed,
+          tableInfo = table.getTableInfo,
           partition = partition
-        ).process(spark)
+        ).run(spark)
     }
     LOGGER.info(s"finish data loading, time taken ${ System.currentTimeMillis() - start }ms")
   }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 525de29..68189e2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -176,8 +176,17 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
       convertedStaticPartition)
     scanResultRdd = sparkSession.sessionState.executePlan(newLogicalPlan).toRdd
     if (logicalPartitionRelation != null) {
-      logicalPartitionRelation =
-        getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+      if (selectedColumnSchema.length != logicalPartitionRelation.output.length) {
+        throw new RuntimeException(" schema length doesn't match partition length")
+      }
+      val isNotReArranged = selectedColumnSchema.zipWithIndex.exists {
+        case (cs, i) => !cs.getColumnName.equals(logicalPartitionRelation.output(i).name)
+      }
+      if (isNotReArranged) {
+        // Re-arrange the catalog table schema and output for partition relation
+        logicalPartitionRelation =
+          getReArrangedSchemaLogicalRelation(reArrangedIndex, logicalPartitionRelation)
+      }
     }
     // Delete stale segment folders that are not in table status but are physically present in
     // the Fact folder
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 5c0780e..0aada09 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -50,7 +50,7 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, TableDataMap}
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.DateDirectDictionaryGenerator
+import org.apache.carbondata.core.keygenerator.directdictionary.timestamp.{DateDirectDictionaryGenerator, TimeStampGranularityTypeValue}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
@@ -728,7 +728,13 @@ object CommonLoadUtils {
     }
     val updatedRdd: RDD[InternalRow] = rdd.map { internalRow =>
       for (index <- timeStampIndex) {
-        internalRow.setLong(index, internalRow.getLong(index) / 1000)
+        if (internalRow.getLong(index) == 0) {
+          internalRow.setNullAt(index)
+        } else {
+          internalRow.setLong(
+            index,
+            internalRow.getLong(index) / TimeStampGranularityTypeValue.MILLIS_SECONDS.getValue)
+        }
       }
       var doubleValue: Double = 0
       for (index <- doubleIndex) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
index ddfed80..9725b4e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
@@ -41,7 +41,7 @@ case class CarbonCreateTableAsSelectCommand(
     ifNotExistsSet: Boolean = false,
     tableLocation: Option[String] = None) extends AtomicRunnableCommand {
 
-  var loadCommand: CarbonInsertIntoCommand = _
+  var insertIntoCommand: CarbonInsertIntoCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val tableName = tableInfo.getFactTable.getTableName
@@ -76,7 +76,7 @@ case class CarbonCreateTableAsSelectCommand(
         .createCarbonDataSourceHadoopRelation(sparkSession,
           TableIdentifier(tableName, Option(dbName)))
       // execute command to load data into carbon table
-      loadCommand = CarbonInsertIntoCommand(
+      insertIntoCommand = CarbonInsertIntoCommand(
         databaseNameOp = Some(carbonDataSourceHadoopRelation.carbonRelation.databaseName),
         tableName = carbonDataSourceHadoopRelation.carbonRelation.tableName,
         options = scala.collection.immutable
@@ -85,14 +85,14 @@ case class CarbonCreateTableAsSelectCommand(
         isOverwriteTable = false,
         logicalPlan = query,
         tableInfo = tableInfo)
-      loadCommand.processMetadata(sparkSession)
+      insertIntoCommand.processMetadata(sparkSession)
     }
     Seq.empty
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    if (null != loadCommand) {
-      loadCommand.processData(sparkSession)
+    if (null != insertIntoCommand) {
+      insertIntoCommand.processData(sparkSession)
     }
     Seq.empty
   }
diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
index 994bc4d..5b400b2 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVDataMapProvider.scala
@@ -21,7 +21,7 @@ import java.io.IOException
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonUtils, SparkSession}
-import org.apache.spark.sql.execution.command.management.CarbonInsertIntoWithDf
+import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.sql.util.SparkSQLUtil
@@ -141,19 +141,17 @@ class MVDataMapProvider(
           !column.getColumnName
             .equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
         }.sortBy(_.getSchemaOrdinal).map(_.getColumnName).mkString(",")
-      val insertWithDf = CarbonInsertIntoWithDf(
+      val insertIntoCommand = CarbonInsertIntoCommand(
         databaseNameOp = Some(identifier.getDatabaseName),
         tableName = identifier.getTableName,
         options = scala.collection.immutable.Map("fileheader" -> header),
         isOverwriteTable,
-        dataFrame = updatedQuery,
-        updateModel = None,
-        tableInfoOp = None,
+        logicalPlan = updatedQuery.queryExecution.analyzed,
+        tableInfo = dataMapTable.getTableInfo,
         internalOptions = Map("mergedSegmentName" -> newLoadName,
-          CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"),
-        partition = Map.empty)
+          CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true"))
       try {
-        insertWithDf.process(sparkSession)
+        insertIntoCommand.run(sparkSession)
       } catch {
         case ex: Exception =>
           // If load to dataMap table fails, disable the dataMap and if newLoad is still
diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 55ab5e8..773482f 100644
--- a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -1160,6 +1160,8 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test mv with duplicate columns in query and constant column") {
+    // new optimized insert into flow doesn't support duplicate column names, so send it to old flow
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
     sql("drop table if exists maintable")
     sql("create table maintable(name string, age int, add string) STORED AS carbondata")
     sql("create materialized view dupli_mv as select name, sum(age),sum(age) from maintable group by name")
@@ -1178,6 +1180,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     assert(TestUtil.verifyMVDataMap(df4.queryExecution.optimizedPlan, "constant_mv"))
     assert(TestUtil.verifyMVDataMap(df5.queryExecution.optimizedPlan, "dupli_projection"))
     assert(TestUtil.verifyMVDataMap(df6.queryExecution.optimizedPlan, "dupli_projection"))
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false")
   }
 
   test("test mv query when the column names and table name same in join scenario") {
diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
index 11b6078..00b6bd8 100644
--- a/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
+++ b/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
@@ -21,6 +21,8 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.mv.rewrite.TestUtil
 
 class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
@@ -284,6 +286,9 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test mv timeseries duplicate columns and constant columns") {
+    // new optimized insert into flow doesn't support duplicate column names, so send it to old flow
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
     dropDataMap("datamap1")
     sql(
       "create materialized view datamap1 as " +
@@ -300,6 +305,8 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
     val df3 = sql("select timeseries(projectjoindate,'month') ,sum(1)  ex from maintable group by timeseries(projectjoindate,'month')")
     checkPlan("datamap1", df3)
     dropDataMap("datamap1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false")
   }
 
   test("test mv timeseries with like filters") {
@@ -313,6 +320,10 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test mv timeseries with join scenario") {
+    // this test case datamap table is created with distinct column (2 columns),
+    // but insert projection has duplicate column(3 columns). Cannot support in new insert into flow
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "true")
     sql("drop table if exists secondtable")
     sql(
       "CREATE TABLE secondtable (empno int,empname string, projectcode int, projectjoindate " +
@@ -328,6 +339,8 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
       " from maintable t1 inner join secondtable t2 where" +
       " t2.projectcode = t1.projectcode group by timeseries(t1.projectjoindate,'month')")
     checkPlan("datamap1", df)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_ENABLE_BAD_RECORD_HANDLING_FOR_INSERT, "false")
   }
 
   test("test create materialized view with group by columns not present in projection") {
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 7575754..0965ee6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.OutputFilesInfoHolder;
 
 public class CarbonDataLoadConfiguration {
@@ -236,18 +238,12 @@ public class CarbonDataLoadConfiguration {
   }
 
   public DataType[] getMeasureDataType() {
-    List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
-    int measureCount = 0;
-    for (int i = 0; i < dataFields.length; i++) {
-      if (!dataFields[i].getColumn().isDimension()) {
-        measureIndexes.add(i);
-        measureCount++;
-      }
-    }
-
-    DataType[] type = new DataType[measureCount];
+    // data field might be rearranged in case of partition.
+    // so refer internal order not the data field order.
+    List<CarbonMeasure> visibleMeasures = tableSpec.getCarbonTable().getVisibleMeasures();
+    DataType[] type = new DataType[visibleMeasures.size()];
     for (int i = 0; i < type.length; i++) {
-      type[i] = dataFields[measureIndexes.get(i)].getColumn().getDataType();
+      type[i] = visibleMeasures.get(i).getDataType();
     }
     return type;
   }
@@ -258,22 +254,16 @@ public class CarbonDataLoadConfiguration {
    * @return
    */
   public CarbonColumn[] getNoDictAndComplexDimensions() {
-    List<Integer> noDicOrCompIndexes = new ArrayList<>(dataFields.length);
-    int noDicCount = 0;
-    for (int i = 0; i < dataFields.length; i++) {
-      if (dataFields[i].getColumn().isDimension() && (
-          dataFields[i].getColumn().getDataType() != DataTypes.DATE || dataFields[i].getColumn()
-              .isComplex())) {
-        noDicOrCompIndexes.add(i);
-        noDicCount++;
+    // data field might be rearranged in case of partition.
+    // so refer internal order not the data field order.
+    List<CarbonDimension> visibleDimensions = tableSpec.getCarbonTable().getVisibleDimensions();
+    List<CarbonColumn> noDictionaryDimensions = new ArrayList<>();
+    for (int i = 0; i < visibleDimensions.size(); i++) {
+      if (visibleDimensions.get(i).getDataType() != DataTypes.DATE) {
+        noDictionaryDimensions.add(visibleDimensions.get(i));
       }
     }
-
-    CarbonColumn[] dims = new CarbonColumn[noDicCount];
-    for (int i = 0; i < dims.length; i++) {
-      dims[i] = dataFields[noDicOrCompIndexes.get(i)].getColumn();
-    }
-    return dims;
+    return noDictionaryDimensions.toArray(new CarbonColumn[0]);
   }
 
   /**
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 93392b5..0f5b203 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -52,7 +52,6 @@ import org.apache.carbondata.processing.store.CarbonFactHandler;
 import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.log4j.Logger;
 
 /**
@@ -170,70 +169,29 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
   }
 
   private void initializeNoReArrangeIndexes() {
-    List<ColumnSchema> listOfColumns =
-        configuration.getTableSpec().getCarbonTable().getTableInfo().getFactTable()
-            .getListOfColumns();
-    List<Integer> internalOrder = new ArrayList<>();
-    List<Integer> invisibleIndex = new ArrayList<>();
-    for (ColumnSchema col : listOfColumns) {
-      // consider the invisible columns other than the dummy measure(-1)
-      if (col.isInvisible() && col.getSchemaOrdinal() != -1) {
-        invisibleIndex.add(col.getSchemaOrdinal());
-      }
-    }
-    int complexChildCount = 0;
-    for (ColumnSchema col : listOfColumns) {
-      if (col.isInvisible()) {
-        continue;
-      }
-      if (col.getColumnName().contains(".")) {
-        // If the schema ordinal is -1,
-        // no need to consider it during shifting columns to derive new shifted ordinal
-        if (col.getSchemaOrdinal() != -1) {
-          complexChildCount = complexChildCount + 1;
-        }
-      } else {
-        // get number of invisible index count before this column
-        int invisibleIndexCount = 0;
-        for (int index : invisibleIndex) {
-          if (index < col.getSchemaOrdinal()) {
-            invisibleIndexCount++;
-          }
-        }
-        if (col.getDataType().isComplexType()) {
-          // Calculate re-arrange index by ignoring the complex child count.
-          // As projection will have only parent columns
-          internalOrder.add(col.getSchemaOrdinal() - complexChildCount - invisibleIndexCount);
-        } else {
-          internalOrder.add(col.getSchemaOrdinal() - invisibleIndexCount);
-        }
-      }
-    }
+    // Data might have partition columns in the end in new insert into flow.
+    // But when convert to 3 parts, just keep in internal order. so derive index for that.
+    List<CarbonColumn> listOfColumns = new ArrayList<>();
+    listOfColumns.addAll(configuration.getTableSpec().getCarbonTable().getVisibleDimensions());
+    listOfColumns.addAll(configuration.getTableSpec().getCarbonTable().getVisibleMeasures());
     // In case of partition, partition data will be at the end. So, need to keep data position
-    List<Pair<DataField, Integer>> dataPositionList = new ArrayList<>();
+    Map<String, Integer> dataPositionMap = new HashMap<>();
     int dataPosition = 0;
     for (DataField field : configuration.getDataFields()) {
-      dataPositionList.add(Pair.of(field, dataPosition++));
-    }
-    // convert to original create order
-    dataPositionList.sort(Comparator.comparingInt(p -> p.getKey().getColumn().getSchemaOrdinal()));
-    // re-arranged data fields
-    List<Pair<DataField, Integer>> reArrangedDataFieldList = new ArrayList<>();
-    for (int index : internalOrder) {
-      reArrangedDataFieldList.add(dataPositionList.get(index));
+      dataPositionMap.put(field.getColumn().getColName(), dataPosition++);
     }
-    // get the index of each type and used for 3 parts conversion
-    for (Pair<DataField, Integer> fieldWithDataPosition : reArrangedDataFieldList) {
-      if (fieldWithDataPosition.getKey().getColumn().hasEncoding(Encoding.DICTIONARY)) {
-        directDictionaryDimensionIndex.add(fieldWithDataPosition.getValue());
+    // get the index of each type and to be used in 3 parts conversion
+    for (CarbonColumn column : listOfColumns) {
+      if (column.hasEncoding(Encoding.DICTIONARY)) {
+        directDictionaryDimensionIndex.add(dataPositionMap.get(column.getColName()));
       } else {
-        if (fieldWithDataPosition.getKey().getColumn().getDataType().isComplexType()) {
-          complexTypeIndex.add(fieldWithDataPosition.getValue());
-        } else if (fieldWithDataPosition.getKey().getColumn().isMeasure()) {
-          measureIndex.add(fieldWithDataPosition.getValue());
+        if (column.getDataType().isComplexType()) {
+          complexTypeIndex.add(dataPositionMap.get(column.getColName()));
+        } else if (column.isMeasure()) {
+          measureIndex.add(dataPositionMap.get(column.getColName()));
         } else {
           // other dimensions
-          otherDimensionIndex.add(fieldWithDataPosition.getValue());
+          otherDimensionIndex.add(dataPositionMap.get(column.getColName()));
         }
       }
     }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 9672de6..aaa9a38 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -99,9 +99,13 @@ public class TablePage {
     noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
     int tmpNumDictDimIdx = 0;
     int tmpNumNoDictDimIdx = 0;
-    for (int i = 0; i < dictDimensionPages.length + noDictDimensionPages.length; i++) {
+    for (int i = 0; i < tableSpec.getNumDimensions(); i++) {
       TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i);
-      ColumnType columnType = tableSpec.getDimensionSpec(i).getColumnType();
+      if (spec.getSchemaDataType().isComplexType()) {
+        // skip complex columns and go other dimensions.
+        // partition scenario dimensions can present after complex columns.
+        continue;
+      }
       ColumnPage page;
       if (spec.getSchemaDataType() == DataTypes.DATE) {
         page = ColumnPage.newPage(