You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/08/01 14:38:46 UTC

carbondata git commit: [CARBONDATA-2796][32K]Fix data loading problem when table has complex column and long string column

Repository: carbondata
Updated Branches:
  refs/heads/master af984101e -> 11fb422d9


[CARBONDATA-2796][32K]Fix data loading problem when table has complex column and long string column

currently both varchar column and complex column believes itself is the last one member in noDictionary group when converting carbon row from raw format to 3-parted format. Since they need to be proceeded in different way, exception will occur if we deal the column in wrong way.

To fix this, we mark the info of complex columns explicitly like varchar columns, and keep the order of noDictionary group as : normal Dim & varchar & complex

This closes #2577


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/11fb422d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/11fb422d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/11fb422d

Branch: refs/heads/master
Commit: 11fb422d9d03a5354d746d134bd25ed65e6ca736
Parents: af98410
Author: Manhua <ke...@qq.com>
Authored: Mon Jul 30 15:07:37 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Aug 1 22:38:13 2018 +0800

----------------------------------------------------------------------
 .../complexType/TestComplexDataType.scala       |  7 ----
 .../LocalDictionarySupportAlterTableTest.scala  | 14 +++----
 .../LocalDictionarySupportCreateTableTest.scala |  6 +--
 .../VarcharDataTypesBasicTestCase.scala         | 33 +++++++++++++++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 15 ++++++--
 .../command/carbonTableSchemaCommon.scala       |  8 +---
 .../loading/row/IntermediateSortTempRow.java    | 26 +++++++++----
 .../loading/sort/SortStepRowHandler.java        | 39 +++++++++++++-------
 .../sort/sortdata/TableFieldStat.java           | 31 +++++++++++-----
 .../store/CarbonFactDataHandlerModel.java       |  6 +--
 10 files changed, 125 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/11fb422d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
index 1ad7889..8527380 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala
@@ -45,7 +45,6 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
       "create table table1 (roll string,person Struct<detail:int>) stored by " +
       "'carbondata'")
     sql("insert into table1 values('abc',1)")
-    sql("select roll,person,roll,person.detail from table1").show(false)
     checkAnswer(sql("select roll,person,person.detail from table1"),
       Seq(Row("abc", Row(1), 1)))
     checkAnswer(sql("select person,person.detail from table1"),
@@ -60,7 +59,6 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
       "create table table1 (roll string,person array<int>) stored by " +
       "'carbondata'")
     sql("insert into table1 values('abc','1$2$3')")
-    sql("select roll,person,roll,person from table1").show(false)
     checkAnswer(sql("select roll,person from table1"),
       Seq(Row("abc", mutable.WrappedArray.make(Array(1, 2, 3)))))
   }
@@ -99,8 +97,6 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
       "create table table1 (roll int,person Struct<detail:array<string>>) stored by " +
       "'carbondata'")
     sql("insert into table1 values(1,'abc:bcd')")
-    //    sql("select person from table1").show(false)
-    sql("select person.detail[0] from table1").show(false)
     checkAnswer(sql("select person.detail[0] from table1"), Seq(Row("abc")))
     checkAnswer(sql("select person.detail[1] from table1"), Seq(Row("bcd")))
     checkAnswer(sql("select roll,person from table1"),
@@ -164,7 +160,6 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
       "'carbondata'")
     sql("insert into table1 values(1,'2018/01/01')")
     checkExistence(sql("select person from table1"), true, "2018-01-01 00:00:00.0")
-    sql("select person,roll,person.detail from table1").show(false)
     checkAnswer(sql("select person,roll,person.detail from table1"),
       Seq(Row(Row(Timestamp.valueOf("2018-01-01 00:00:00.0")), 1,
         Timestamp.valueOf("2018-01-01 00:00:00.0"))))
@@ -227,7 +222,6 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
       "'carbondata'")
     sql("insert into table1 values(1,20)")
     checkExistence(sql("select person from table1"), true, "20")
-    sql("select person,person.detail from table1").show(false)
     checkAnswer(sql("select person,roll,person.detail from table1"), Seq(Row(Row(20), 1, 20)))
     checkExistence(sql("select person.detail from table1"), true, "20")
     checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(20))))
@@ -252,7 +246,6 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll {
       "'carbondata'")
     sql("insert into table1 values(1,true)")
     checkExistence(sql("select person from table1"), true, "true")
-    sql("select person,person.detail from table1").show(false)
     checkAnswer(sql("select person,roll,person.detail from table1"), Seq(Row(Row(true), 1, true)))
     checkExistence(sql("select person.detail from table1"), true, "true")
     checkAnswer(sql("select roll,person from table1"), Seq(Row(1, Row(true))))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11fb422d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportAlterTableTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportAlterTableTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportAlterTableTest.scala
index 373b309..24af99e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportAlterTableTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportAlterTableTest.scala
@@ -1235,7 +1235,7 @@ class LocalDictionarySupportAlterTableTest extends QueryTest with BeforeAndAfter
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Exclude")) match {
-      case Some(row) => assert(row.get(1).toString.contains("st.val.si,name"))
+      case Some(row) => assert(row.get(1).toString.contains("name,st.val.si"))
       case None => assert(false)
     }
   }
@@ -1260,7 +1260,7 @@ class LocalDictionarySupportAlterTableTest extends QueryTest with BeforeAndAfter
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
-      case Some(row) => assert(row.get(1).toString.contains("st.val.si,name"))
+      case Some(row) => assert(row.get(1).toString.contains("name,st.val.si"))
       case None => assert(false)
     }
   }
@@ -1284,7 +1284,7 @@ class LocalDictionarySupportAlterTableTest extends QueryTest with BeforeAndAfter
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Exclude")) match {
-      case Some(row) => assert(row.get(1).toString.contains("st.val.sd,name"))
+      case Some(row) => assert(row.get(1).toString.contains("name,st.val.sd"))
       case None => assert(false)
     }
   }
@@ -1309,7 +1309,7 @@ class LocalDictionarySupportAlterTableTest extends QueryTest with BeforeAndAfter
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Exclude")) match {
-      case Some(row) => assert(row.get(1).toString.contains("h,st.val.sd,name"))
+      case Some(row) => assert(row.get(1).toString.contains("h,name,st.val.sd"))
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
@@ -1338,7 +1338,7 @@ class LocalDictionarySupportAlterTableTest extends QueryTest with BeforeAndAfter
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Exclude")) match {
-      case Some(row) => assert(row.get(1).toString.contains("h,st.sd,st.sh.val,name"))
+      case Some(row) => assert(row.get(1).toString.contains("h,name,st.sd,st.sh.val"))
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
@@ -1367,7 +1367,7 @@ class LocalDictionarySupportAlterTableTest extends QueryTest with BeforeAndAfter
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Exclude")) match {
-      case Some(row) => assert(row.get(1).toString.contains("h,st.sd,name"))
+      case Some(row) => assert(row.get(1).toString.contains("h,name,st.sd"))
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
@@ -1396,7 +1396,7 @@ class LocalDictionarySupportAlterTableTest extends QueryTest with BeforeAndAfter
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Exclude")) match {
-      case Some(row) => assert(row.get(1).toString.contains("h,st.val,name"))
+      case Some(row) => assert(row.get(1).toString.contains("h,name,st.val"))
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11fb422d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportCreateTableTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportCreateTableTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportCreateTableTest.scala
index 52c18d0..a02d3ef 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportCreateTableTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/localdictionary/LocalDictionarySupportCreateTableTest.scala
@@ -2364,7 +2364,7 @@ class LocalDictionarySupportCreateTableTest extends QueryTest with BeforeAndAfte
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
-      case Some(row) => assert(row.get(1).toString.contains("st.val.si,name") && !row.get(1).toString.contains("city"))
+      case Some(row) => assert(row.get(1).toString.contains("name,st.val.si") && !row.get(1).toString.contains("city"))
       case None => assert(false)
     }
   }
@@ -2387,7 +2387,7 @@ class LocalDictionarySupportCreateTableTest extends QueryTest with BeforeAndAfte
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Exclude")) match {
-      case Some(row) => assert(row.get(1).toString.contains("st.val.si,name") &&
+      case Some(row) => assert(row.get(1).toString.contains("name,st.val.si") &&
                                !row.get(1).toString.contains("city"))
       case None => assert(false)
     }
@@ -2416,7 +2416,7 @@ class LocalDictionarySupportCreateTableTest extends QueryTest with BeforeAndAfte
       case None => assert(false)
     }
     descLoc.find(_.get(0).toString.contains("Local Dictionary Include")) match {
-      case Some(row) => assert(row.get(1).toString.contains("st.val.si,name") &&
+      case Some(row) => assert(row.get(1).toString.contains("name,st.val.si") &&
                                !row.get(1).toString.contains("city"))
       case None => assert(false)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11fb422d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
index 4aa7062..b607d07 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
@@ -31,6 +31,8 @@ import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.util.CarbonProperties
 
+import scala.collection.mutable
+
 class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
   private val longStringTable = "long_string_table"
   private val inputDir = s"$resourcesPath${File.separator}varchartype${File.separator}"
@@ -318,6 +320,37 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi
     sql(s"DROP DATAMAP IF EXISTS $datamapName ON TABLE $longStringTable")
   }
 
+  test("create table with varchar column and complex column") {
+    sql("DROP TABLE IF EXISTS varchar_complex_table")
+    sql("""
+        | CREATE TABLE varchar_complex_table
+        | (m1 int,arr1 array<string>,varchar1 string,s1 string,varchar2 string,arr2 array<string>)
+        | STORED BY 'carbondata'
+        | TBLPROPERTIES('long_string_columns'='varchar1,varchar2')
+        | """.stripMargin)
+    sql(
+      """
+        | INSERT INTO TABLE varchar_complex_table
+        | VALUES(1,'ar1.0$ar1.1','longstr10','normal string1','longstr11','ar2.0$ar2.1'),
+        | (2,'ar1.2$ar1.3','longstr20','normal string2','longstr21','ar2.2$ar2.3')
+        | """.stripMargin)
+    checkAnswer(
+      sql("SELECT * FROM varchar_complex_table where varchar1='longstr10'"),
+      Seq(Row(1,mutable.WrappedArray.make(Array("ar1.0","ar1.1")),"longstr10","normal string1",
+        "longstr11",mutable.WrappedArray.make(Array("ar2.0","ar2.1")))))
+    checkAnswer(
+      sql(
+        """
+          |SELECT varchar1,arr2,s1,m1,varchar2,arr1
+          |FROM varchar_complex_table
+          |WHERE arr1[1]='ar1.3'
+          |""".stripMargin),
+      Seq(Row("longstr20",mutable.WrappedArray.make(Array("ar2.2","ar2.3")),"normal string2",2,
+        "longstr21",mutable.WrappedArray.make(Array("ar1.2","ar1.3")))))
+
+    sql("DROP TABLE IF EXISTS varchar_complex_table")
+  }
+
     // ignore this test in CI, because it will need at least 4GB memory to run successfully
   ignore("Exceed 2GB per column page for varchar datatype") {
     deleteFile(inputFile_2g_column_page)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11fb422d/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 2599b3f..bb68ec5 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -225,17 +225,24 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
   protected val escapedIdentifier = "`([^`]+)`".r
 
-  private def reorderDimensions(dims: Seq[Field]): Seq[Field] = {
-    var complexDimensions: Seq[Field] = Seq()
+  private def reorderDimensions(dims: Seq[Field], varcharCols: Seq[String]): Seq[Field] = {
     var dimensions: Seq[Field] = Seq()
+    var varcharDimensions: Seq[Field] = Seq()
+    var complexDimensions: Seq[Field] = Seq()
     dims.foreach { dimension =>
       dimension.dataType.getOrElse("NIL") match {
         case "Array" => complexDimensions = complexDimensions :+ dimension
         case "Struct" => complexDimensions = complexDimensions :+ dimension
+        case "String" =>
+          if (varcharCols.exists(dimension.column.equalsIgnoreCase)) {
+            varcharDimensions = varcharDimensions :+ dimension
+          } else {
+            dimensions = dimensions :+ dimension
+          }
         case _ => dimensions = dimensions :+ dimension
       }
     }
-    dimensions ++ complexDimensions
+    dimensions ++ varcharDimensions ++ complexDimensions
   }
 
   /**
@@ -415,7 +422,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       dbName,
       tableName,
       tableProperties.toMap,
-      reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
+      reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f)), varcharColumns),
       msrs.map(f => normalizeType(f)),
       Option(sortKeyDims),
       Option(varcharColumns),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11fb422d/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index a61f94f..4a99ac7 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -689,12 +689,8 @@ class TableNewProcessor(cm: TableModel) {
         }
       }
     }
-    // dimensions that are not varchar
-    cm.dimCols.filter(field => !cm.varcharCols.get.contains(field.column))
-      .foreach(addDimensionCol(_))
-    // dimensions that are varchar
-    cm.dimCols.filter(field => cm.varcharCols.get.contains(field.column))
-      .foreach(addDimensionCol(_))
+    // add all dimensions
+    cm.dimCols.foreach(addDimensionCol(_))
 
     // check whether the column is a local dictionary column and set in column schema
     if (null != cm.tableProperties) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11fb422d/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
index 8bec099..47b419e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -54,13 +54,15 @@ public class IntermediateSortTempRow {
   /**
    * deserialize from bytes array to get the no sort fields
    * @param outDictNoSort stores the dict & no-sort fields
-   * @param outNoDictNoSortAndVarcharDims stores the no-dict & no-sort fields,
- *                                    including complex and varchar fields
+   * @param outNoDictNoSort stores all no-dict & no-sort fields,
+   *                        including complex and varchar fields
    * @param outMeasures stores the measure fields
    * @param dataTypes data type for the measure
+   * @param varcharDimCnt number of varchar column
+   * @param complexDimCnt number of complex column
    */
-  public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSortAndVarcharDims,
-      Object[] outMeasures, DataType[] dataTypes, int varcharDimCnt) {
+  public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort,
+      Object[] outMeasures, DataType[] dataTypes, int varcharDimCnt, int complexDimCnt) {
     ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
     // read dict_no_sort
     int dictNoSortCnt = outDictNoSort.length;
@@ -68,13 +70,13 @@ public class IntermediateSortTempRow {
       outDictNoSort[i] = rowBuffer.getInt();
     }
 
-    // read no_dict_no_sort (including complex)
-    int noDictNoSortCnt = outNoDictNoSortAndVarcharDims.length - varcharDimCnt;
+    // read no_dict_no_sort
+    int noDictNoSortCnt = outNoDictNoSort.length - varcharDimCnt - complexDimCnt;
     for (int i = 0; i < noDictNoSortCnt; i++) {
       short len = rowBuffer.getShort();
       byte[] bytes = new byte[len];
       rowBuffer.get(bytes);
-      outNoDictNoSortAndVarcharDims[i] = bytes;
+      outNoDictNoSort[i] = bytes;
     }
 
     // read varchar dims
@@ -82,7 +84,15 @@ public class IntermediateSortTempRow {
       int len = rowBuffer.getInt();
       byte[] bytes = new byte[len];
       rowBuffer.get(bytes);
-      outNoDictNoSortAndVarcharDims[i + noDictNoSortCnt] = bytes;
+      outNoDictNoSort[i + noDictNoSortCnt] = bytes;
+    }
+
+    // read complex dims
+    for (int i = 0; i < complexDimCnt; i++) {
+      short len = rowBuffer.getShort();
+      byte[] bytes = new byte[len];
+      rowBuffer.get(bytes);
+      outNoDictNoSort[i + noDictNoSortCnt + varcharDimCnt] = bytes;
     }
 
     // read measure

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11fb422d/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index 1c6d8b2..0118e4d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -47,6 +47,7 @@ public class SortStepRowHandler implements Serializable {
   private int noDictSortDimCnt = 0;
   private int noDictNoSortDimCnt = 0;
   private int varcharDimCnt = 0;
+  private int complexDimCnt = 0;
   private int measureCnt;
 
   // indices for dict & sort dimension columns
@@ -55,9 +56,10 @@ public class SortStepRowHandler implements Serializable {
   private int[] dictNoSortDimIdx;
   // indices for no-dict & sort dimension columns
   private int[] noDictSortDimIdx;
-  // indices for no-dict & no-sort dimension columns, including complex columns
+  // indices for no-dict & no-sort dimension columns, excluding complex/varchar columns
   private int[] noDictNoSortDimIdx;
   private int[] varcharDimIdx;
+  private int[] complexDimIdx;
   // indices for measure columns
   private int[] measureIdx;
 
@@ -73,12 +75,14 @@ public class SortStepRowHandler implements Serializable {
     this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt();
     this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt();
     this.varcharDimCnt = tableFieldStat.getVarcharDimCnt();
+    this.complexDimCnt = tableFieldStat.getComplexDimCnt();
     this.measureCnt = tableFieldStat.getMeasureCnt();
     this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx();
     this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx();
     this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx();
     this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx();
     this.varcharDimIdx = tableFieldStat.getVarcharDimIdx();
+    this.complexDimIdx = tableFieldStat.getComplexDimIdx();
     this.measureIdx = tableFieldStat.getMeasureIdx();
     this.dataTypes = tableFieldStat.getMeasureDataType();
   }
@@ -104,8 +108,8 @@ public class SortStepRowHandler implements Serializable {
     try {
       int[] dictDims
           = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-      byte[][] nonDictArray
-          = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt][];
+      byte[][] nonDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt
+                                       + this.varcharDimCnt + this.complexDimCnt ][];
       Object[] measures = new Object[this.measureCnt];
 
       // convert dict & data
@@ -131,6 +135,10 @@ public class SortStepRowHandler implements Serializable {
       for (int idx = 0; idx < this.varcharDimCnt; idx++) {
         nonDictArray[idxAcc++] = (byte[]) row[this.varcharDimIdx[idx]];
       }
+      // convert complex dims
+      for (int idx = 0; idx < this.complexDimCnt; idx++) {
+        nonDictArray[idxAcc++] = (byte[]) row[this.complexDimIdx[idx]];
+      }
 
       // convert measure data
       for (int idx = 0; idx < this.measureCnt; idx++) {
@@ -152,18 +160,17 @@ public class SortStepRowHandler implements Serializable {
    * @return 3-parted row
    */
   public Object[] convertIntermediateSortTempRowTo3Parted(IntermediateSortTempRow sortTempRow) {
-    int[] dictDims
-        = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-    byte[][] noDictArray
-        = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt][];
+    int[] dictDims = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
+    byte[][] noDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt
+                                    + this.varcharDimCnt + this.complexDimCnt][];
 
     int[] dictNoSortDims = new int[this.dictNoSortDimCnt];
-    byte[][] noDictNoSortAndVarcharDims
-        = new byte[this.noDictNoSortDimCnt + this.varcharDimCnt][];
+    byte[][] noDictNoSortAndVarcharComplexDims
+        = new byte[this.noDictNoSortDimCnt + this.varcharDimCnt + this.complexDimCnt][];
     Object[] measures = new Object[this.measureCnt];
 
-    sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortAndVarcharDims, measures,
-        this.dataTypes, this.varcharDimCnt);
+    sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortAndVarcharComplexDims, measures,
+        this.dataTypes, this.varcharDimCnt, this.complexDimCnt);
 
     // dict dims
     System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims,
@@ -174,8 +181,8 @@ public class SortStepRowHandler implements Serializable {
     // no dict dims, including complex
     System.arraycopy(sortTempRow.getNoDictSortDims(), 0,
         noDictArray, 0, this.noDictSortDimCnt);
-    System.arraycopy(noDictNoSortAndVarcharDims, 0, noDictArray,
-        this.noDictSortDimCnt, this.noDictNoSortDimCnt + this.varcharDimCnt);
+    System.arraycopy(noDictNoSortAndVarcharComplexDims, 0, noDictArray,
+        this.noDictSortDimCnt, this.noDictNoSortDimCnt + this.varcharDimCnt + this.complexDimCnt);
 
     // measures are already here
 
@@ -445,6 +452,12 @@ public class SortStepRowHandler implements Serializable {
       rowBuffer.putInt(bytes.length);
       rowBuffer.put(bytes);
     }
+    // convert complex dims
+    for (int idx = 0; idx < this.complexDimCnt; idx++) {
+      byte[] bytes = (byte[]) row[this.complexDimIdx[idx]];
+      rowBuffer.putShort((short) bytes.length);
+      rowBuffer.put(bytes);
+    }
 
     // convert measure
     Object tmpValue;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11fb422d/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 094bd83..353ddb4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -33,8 +33,10 @@ public class TableFieldStat implements Serializable {
   private int dictSortDimCnt = 0;
   private int dictNoSortDimCnt = 0;
   private int noDictSortDimCnt = 0;
-  // for columns that are no_dict_dim and no_sort_dim and complex, except the varchar dims
+  // for columns that are no_dict_dim and no_sort_dim, except complex/varchar dims
   private int noDictNoSortDimCnt = 0;
+  // for columns that are complex data type
+  private int complexDimCnt = 0;
   // for columns that are varchar data type
   private int varcharDimCnt = 0;
   // whether sort column is of dictionary type or not
@@ -49,17 +51,19 @@ public class TableFieldStat implements Serializable {
   private int[] dictNoSortDimIdx;
   // indices for no-dict & sort dimension columns
   private int[] noDictSortDimIdx;
-  // indices for no-dict & no-sort dimension columns, including complex columns
+  // indices for no-dict & no-sort dimension columns, excluding complex/varchar columns
   private int[] noDictNoSortDimIdx;
   // indices for varchar dimension columns
   private int[] varcharDimIdx;
+  // indices for varchar dimension columns
+  private int [] complexDimIdx;
   // indices for measure columns
   private int[] measureIdx;
 
   public TableFieldStat(SortParameters sortParameters) {
     int noDictDimCnt = sortParameters.getNoDictionaryCount();
-    int complexDimCnt = sortParameters.getComplexDimColCount();
     int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
+    this.complexDimCnt = sortParameters.getComplexDimColCount();
     this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn();
     this.isVarcharDimFlags = sortParameters.getIsVarcharDimensionColumn();
     int sortColCnt = isSortColNoDictFlags.length;
@@ -83,8 +87,8 @@ public class TableFieldStat implements Serializable {
     this.dictSortDimIdx = new int[dictSortDimCnt];
     this.dictNoSortDimIdx = new int[dictDimCnt - dictSortDimCnt];
     this.noDictSortDimIdx = new int[noDictSortDimCnt];
-    this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt
-        - varcharDimCnt];
+    this.noDictNoSortDimIdx = new int[noDictDimCnt - noDictSortDimCnt - varcharDimCnt];
+    this.complexDimIdx = new int[complexDimCnt];
     this.varcharDimIdx = new int[varcharDimCnt];
     this.measureIdx = new int[measureCnt];
 
@@ -113,13 +117,13 @@ public class TableFieldStat implements Serializable {
       }
     }
     dictNoSortDimCnt = tmpDictNoSortCnt;
+    noDictNoSortDimCnt = tmpNoDictNoSortCnt;
 
     int base = isDimNoDictFlags.length;
-    // adding complex dimension columns
+    // indices for complex dimension columns
     for (int i = 0; i < complexDimCnt; i++) {
-      noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = base + i;
+      complexDimIdx[i] = base + i;
     }
-    noDictNoSortDimCnt = tmpNoDictNoSortCnt;
 
     base += complexDimCnt;
     // indices for measure columns
@@ -144,6 +148,10 @@ public class TableFieldStat implements Serializable {
     return noDictNoSortDimCnt;
   }
 
+  public int getComplexDimCnt() {
+    return complexDimCnt;
+  }
+
   public int getVarcharDimCnt() {
     return varcharDimCnt;
   }
@@ -180,6 +188,10 @@ public class TableFieldStat implements Serializable {
     return noDictNoSortDimIdx;
   }
 
+  public int[] getComplexDimIdx() {
+    return complexDimIdx;
+  }
+
   public int[] getVarcharDimIdx() {
     return varcharDimIdx;
   }
@@ -196,12 +208,13 @@ public class TableFieldStat implements Serializable {
         && dictNoSortDimCnt == that.dictNoSortDimCnt
         && noDictSortDimCnt == that.noDictSortDimCnt
         && noDictNoSortDimCnt == that.noDictNoSortDimCnt
+        && complexDimCnt == that.complexDimCnt
         && varcharDimCnt == that.varcharDimCnt
         && measureCnt == that.measureCnt;
   }
 
   @Override public int hashCode() {
     return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt,
-        noDictNoSortDimCnt, varcharDimCnt, measureCnt);
+        noDictNoSortDimCnt, complexDimCnt, varcharDimCnt, measureCnt);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/11fb422d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 7201305..26ee65a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -224,13 +224,12 @@ public class CarbonFactDataHandlerModel {
 
     // for dynamic page size in write step if varchar columns exist
     List<Integer> varcharDimIdxInNoDict = new ArrayList<>();
-    int dictDimCount = configuration.getDimensionCount() - configuration.getNoDictionaryCount();
     for (DataField dataField : configuration.getDataFields()) {
       CarbonColumn column = dataField.getColumn();
       if (!column.isComplex() && !dataField.hasDictionaryEncoding() &&
               column.getDataType() == DataTypes.VARCHAR) {
         // ordinal is set in CarbonTable.fillDimensionsAndMeasuresForTables()
-        varcharDimIdxInNoDict.add(column.getOrdinal() - dictDimCount);
+        varcharDimIdxInNoDict.add(column.getOrdinal() - simpleDimsCount);
       }
     }
 
@@ -319,7 +318,8 @@ public class CarbonFactDataHandlerModel {
     // for dynamic page size in write step if varchar columns exist
     List<Integer> varcharDimIdxInNoDict = new ArrayList<>();
     List<CarbonDimension> allDimensions = carbonTable.getDimensions();
-    int dictDimCount = allDimensions.size() - segmentProperties.getNumberOfNoDictionaryDimension();
+    int dictDimCount = allDimensions.size() - segmentProperties.getNumberOfNoDictionaryDimension()
+            - segmentProperties.getComplexDimensions().size();
     for (CarbonDimension dim : allDimensions) {
       if (!dim.isComplex() && !dim.hasEncoding(Encoding.DICTIONARY) &&
           dim.getDataType() == DataTypes.VARCHAR) {