You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/12/28 11:52:28 UTC

carbondata git commit: [CARBONDATA-3196] [CARBONDATA-3203]Fixed Compaction for Complex types with Dictionary Include and also supported Compaction for restructured table

Repository: carbondata
Updated Branches:
  refs/heads/master f5c1b7bbd -> 7c4e79fca


[CARBONDATA-3196] [CARBONDATA-3203]Fixed Compaction for Complex types with Dictionary Include and also supported Compaction for restructured table

Problem1: Compaction Failing for Complex datatypes with Dictionary Include as KeyGenenrator was not being set in model for Dictionary Include Complex
Columns and dictionary include complex columns were not handled for finding cardinality.

Solution: Handled both these issues by setting KeyGenerator and storing cardinality of Complex dictionary include columns.

Problem2: Compaction was failing for restructured table containing dictionary include complex columns.

Solution: Handled complex columns for this case by inserting correct indices of the columns.

This closes #3022


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7c4e79fc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7c4e79fc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7c4e79fc

Branch: refs/heads/master
Commit: 7c4e79fca8e6aac044bbadaf1210b1be2f3b8a8e
Parents: f5c1b7b
Author: manishnalla1994 <ma...@gmail.com>
Authored: Mon Dec 24 17:37:36 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Fri Dec 28 17:27:13 2018 +0530

----------------------------------------------------------------------
 .../core/scan/wrappers/ByteArrayWrapper.java    |  4 ++
 .../src/test/resources/structofarray.csv        | 10 +++
 .../complexType/TestCompactionComplexType.scala | 65 ++++++++++++++++++++
 .../loading/CarbonDataLoadConfiguration.java    | 32 +---------
 .../merger/CompactionResultSortProcessor.java   | 14 ++++-
 .../sort/sortdata/SortParameters.java           |  2 +-
 .../sort/sortdata/TableFieldStat.java           | 34 +++++-----
 .../store/CarbonFactDataHandlerModel.java       | 31 +++++++---
 .../store/CarbonFactHandlerFactory.java         |  1 -
 .../util/CarbonDataProcessorUtil.java           | 33 ++++++++++
 10 files changed, 171 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c4e79fc/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
index 65f29d4..1b903f7 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/wrappers/ByteArrayWrapper.java
@@ -81,6 +81,10 @@ public class ByteArrayWrapper implements Comparable<ByteArrayWrapper>, Serializa
     return this.noDictionaryKeys[index];
   }
 
+  public byte[] getComplexKeyByIndex(int index) {
+    return this.complexTypesKeys[index];
+  }
+
   /**
    * to get the no dictionary column data
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c4e79fc/integration/spark-common-test/src/test/resources/structofarray.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/structofarray.csv b/integration/spark-common-test/src/test/resources/structofarray.csv
new file mode 100644
index 0000000..ef21b44
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/structofarray.csv
@@ -0,0 +1,10 @@
+Cust00000000000000000000,2015,1,20,M,SSC,Y,123456789$2015-01-01  00:00:00$100&3000$100.123&3000.234$United Kingdom&England$2015-01-01  00:00:00&2014-01-01  00:00:00,42,104,160,325046028.8,859616748.6
+Cust00000000000000000001,2015,1,30,F,Degree,N,123456790$2015-01-02  00:00:00$101&3000$101.123&3001.234$United States&MO$2015-01-02  00:00:00&2014-01-02  00:00:00,141,181,54,378476092.1,818599132.6
+Cust00000000000000000002,2015,1,40,M,graduation,D,123456791$2015-01-03  00:00:00$102&3000$102.123&3002.234$United States&OR$2015-01-03  00:00:00&2014-01-03  00:00:00,138,43,175,408335001.4,906020942.6
+Cust00000000000000000003,2015,1,50,F,PG,Y,123456792$2015-01-04  00:00:00$103&3000$103.123&3003.234$Australia&Victoria$2015-01-04  00:00:00&2014-01-04  00:00:00,96,63,184,493146274.5,556184083.3
+Cust00000000000000000004,2015,1,60,M,MS,N,123456793$2015-01-05  00:00:00$104&3000$104.123&3004.234$United States&AL$2015-01-05  00:00:00&2014-01-05  00:00:00,115,172,165,457941392.3,641744932.5
+Cust00000000000000000005,2015,1,70,F,Doctor,D,123456794$2015-01-06  00:00:00$105&3000$105.123&3005.234$United States&NJ$2015-01-06  00:00:00&2014-01-06  00:00:00,178,192,178,112452170.2,502438883.3
+Cust00000000000000000006,2015,1,80,M,Layer,Y,123456795$2015-01-07  00:00:00$106&3000$106.123&3006.234$United States&IL$2015-01-07  00:00:00&2014-01-07  00:00:00,172,194,49,943273831.2,37711205.33
+Cust00000000000000000007,2015,1,90,F,Cop,N,123456796$2015-01-08  00:00:00$107&3000$107.123&3007.234$United States&TN$2015-01-08  00:00:00&2014-01-08  00:00:00,163,23,180,991766321.3,452456856.7
+Cust00000000000000000008,2015,1,95,M,Bank,D,123456797$2015-01-09  00:00:00$108&3000$108.123&3008.234$Israel&Tel Aviv$2015-01-09  00:00:00&2014-01-09  00:00:00,113,18,176,747561503.5,388896200.6
+Cust00000000000000000009,2015,1,45,F,Group1,Y,123456798$2015-01-10  00:00:00$109&3000$109.123&3009.234$France&Ile-de-France$2015-01-10  00:00:00&2014-01-10  00:00:00,50,99,10,667010292.4,910085933.7
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c4e79fc/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCompactionComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCompactionComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCompactionComplexType.scala
index e00d4b6..64ded63 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCompactionComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestCompactionComplexType.scala
@@ -40,6 +40,7 @@ class TestCompactionComplexType extends QueryTest with BeforeAndAfterAll {
   }
 
   override protected def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS compactComplex")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, compactionThreshold)
   }
@@ -1068,4 +1069,68 @@ class TestCompactionComplexType extends QueryTest with BeforeAndAfterAll {
     sql("Drop table if exists adaptive")
   }
 
+  test("Test major compaction with dictionary include for struct of array type") {
+    sql("DROP TABLE IF EXISTS compactComplex")
+    sql(
+      "CREATE TABLE compactComplex(CUST_ID string,YEAR int, MONTH int, AGE int, GENDER string,EDUCATED " +
+      "string,IS_MARRIED " +
+      "string," +
+      "STRUCT_OF_ARRAY struct<ID:int,CHECK_DATE:string,SNo:array<int>,sal1:array<double>," +
+      "state:array<string>," +
+      "date1:array<string>>,CARD_COUNT int,DEBIT_COUNT int,CREDIT_COUNT int, DEPOSIT double, " +
+      "HQ_DEPOSIT double) STORED BY 'carbondata'" +
+      "TBLPROPERTIES('DICTIONARY_INCLUDE'='STRUCT_OF_ARRAY,DEPOSIT,HQ_DEPOSIT')")
+    sql(
+      s"LOAD DATA LOCAL INPATH '$resourcesPath/structofarray.csv' INTO TABLE compactComplex OPTIONS" +
+      s"('DELIMITER'=',','QUOTECHAR'='\'," +
+      "'FILEHEADER'='CUST_ID,YEAR,MONTH,AGE, GENDER,EDUCATED,IS_MARRIED,STRUCT_OF_ARRAY," +
+      "CARD_COUNT," +
+      "DEBIT_COUNT,CREDIT_COUNT, DEPOSIT,HQ_DEPOSIT','COMPLEX_DELIMITER_LEVEL_1'='$', " +
+      "'COMPLEX_DELIMITER_LEVEL_2'='&')")
+    sql(
+      s"LOAD DATA LOCAL INPATH '$resourcesPath/structofarray.csv' INTO TABLE compactComplex OPTIONS" +
+      s"('DELIMITER'=',','QUOTECHAR'='\'," +
+      "'FILEHEADER'='CUST_ID,YEAR,MONTH,AGE, GENDER,EDUCATED,IS_MARRIED,STRUCT_OF_ARRAY," +
+      "CARD_COUNT," +
+      "DEBIT_COUNT,CREDIT_COUNT, DEPOSIT,HQ_DEPOSIT','COMPLEX_DELIMITER_LEVEL_1'='$', " +
+      "'COMPLEX_DELIMITER_LEVEL_2'='&')")
+    sql(
+      s"LOAD DATA LOCAL INPATH '$resourcesPath/structofarray.csv' INTO TABLE compactComplex OPTIONS" +
+      s"('DELIMITER'=',','QUOTECHAR'='\'," +
+      "'FILEHEADER'='CUST_ID,YEAR,MONTH,AGE,GENDER,EDUCATED,IS_MARRIED,STRUCT_OF_ARRAY," +
+      "CARD_COUNT," +
+      "DEBIT_COUNT,CREDIT_COUNT, DEPOSIT,HQ_DEPOSIT','COMPLEX_DELIMITER_LEVEL_1'='$', " +
+      "'COMPLEX_DELIMITER_LEVEL_2'='&')")
+    sql("ALTER TABLE compactComplex COMPACT 'major'")
+    checkAnswer(sql("Select count(*) from compactComplex"), Row(30))
+  }
+
+  test("Test Compaction for complex types with table restructured") {
+    sql("drop table if exists compactComplex")
+    sql(
+      """
+        | create table compactComplex (
+        | name string,
+        | age int,
+        | number string,
+        | structfield struct<a:array<int> ,b:int>
+        | )
+        | stored by 'carbondata'
+        | TBLPROPERTIES(
+        | 'DICTIONARY_INCLUDE'='name,age,number,structfield'
+        | )
+      """.stripMargin)
+    sql("INSERT into compactComplex values('man',25,'222','1000\0022000\0011')")
+    sql("INSERT into compactComplex values('can',24,'333','1000\0022000\0012')")
+    sql("INSERT into compactComplex values('dan',25,'222','1000\0022000\0013')")
+    sql("ALTER TABLE compactComplex drop columns(age)")
+    sql("ALTER TABLE compactComplex COMPACT 'major'")
+    checkAnswer(sql("SELECT * FROM compactComplex"),
+      Seq(Row("man", "222", Row(mutable.WrappedArray.make(Array(1000, 2000)), 1)),
+        Row("can", "333", Row(mutable.WrappedArray.make(Array(1000, 2000)), 2)),
+        Row("dan", "222", Row(mutable.WrappedArray.make(Array(1000, 2000)), 3))
+      ))
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c4e79fc/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 54dc2d4..8cdb6af 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -24,8 +24,6 @@ import java.util.Map;
 
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -381,35 +379,11 @@ public class CarbonDataLoadConfiguration {
     return sortColumnMapping;
   }
 
-  public int[] calcDimensionLengths() {
-    int[] dimLensWithComplex = getCardinalityFinder().getCardinality();
-    if (!isSortTable()) {
-      for (int i = 0; i < dimLensWithComplex.length; i++) {
-        if (dimLensWithComplex[i] != 0) {
-          dimLensWithComplex[i] = Integer.MAX_VALUE;
-        }
-      }
-    }
-    List<Integer> dimsLenList = new ArrayList<Integer>();
-    for (int eachDimLen : dimLensWithComplex) {
-      if (eachDimLen != 0) dimsLenList.add(eachDimLen);
-    }
-    int[] dimLens = new int[dimsLenList.size()];
-    for (int i = 0; i < dimsLenList.size(); i++) {
-      dimLens[i] = dimsLenList.get(i);
-    }
-    return dimLens;
+  public int[] getCardinalityForComplexDimension() {
+    return getCardinalityFinder().getCardinality();
   }
 
-  public KeyGenerator[] createKeyGeneratorForComplexDimension() {
-    int[] dimLens = calcDimensionLengths();
-    KeyGenerator[] complexKeyGenerators = new KeyGenerator[dimLens.length];
-    for (int i = 0; i < dimLens.length; i++) {
-      complexKeyGenerators[i] =
-          KeyGeneratorFactory.getKeyGenerator(new int[] { dimLens[i] });
-    }
-    return complexKeyGenerators;
-  }
+
 
   public TableSpec getTableSpec() {
     return tableSpec;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c4e79fc/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index d14f626..34f0572 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.processing.merger;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
@@ -299,7 +300,9 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
   private Object[] prepareRowObjectForSorting(Object[] row) {
     ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
     // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount];
-    List<CarbonDimension> dimensions = segmentProperties.getDimensions();
+    List<CarbonDimension> dimensions = new ArrayList<>();
+    dimensions.addAll(segmentProperties.getDimensions());
+    dimensions.addAll(segmentProperties.getComplexDimensions());
     Object[] preparedRow = new Object[dimensions.size() + measureCount];
     // convert the dictionary from MDKey to surrogate key
     byte[] dictionaryKey = wrapper.getDictionaryKey();
@@ -310,12 +313,14 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     }
     int noDictionaryIndex = 0;
     int dictionaryIndex = 0;
+    int complexIndex = 0;
+
     for (int i = 0; i < dimensions.size(); i++) {
       CarbonDimension dims = dimensions.get(i);
-      if (dims.hasEncoding(Encoding.DICTIONARY)) {
+      if (dims.hasEncoding(Encoding.DICTIONARY) && !dims.isComplex()) {
         // dictionary
         preparedRow[i] = dictionaryValues[dictionaryIndex++];
-      } else {
+      } else if (!dims.isComplex()) {
         // no dictionary dims
         byte[] noDictionaryKeyByIndex = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
         if (DataTypeUtil.isPrimitiveColumn(dims.getDataType())) {
@@ -331,8 +336,11 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
         } else {
           preparedRow[i] = noDictionaryKeyByIndex;
         }
+      } else {
+        preparedRow[i] = wrapper.getComplexKeyByIndex(complexIndex++);
       }
     }
+
     // fill all the measures
     // measures will always start from 1st index in the row object array
     int measureIndexInRow = 1;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c4e79fc/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 200c5f4..5d78f3f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -499,7 +499,7 @@ public class SortParameters implements Serializable {
     parameters.setSegmentId(segmentId);
     parameters.setTaskNo(taskNo);
     parameters.setMeasureColCount(measureColCount);
-    parameters.setDimColCount(dimColCount - complexDimColCount);
+    parameters.setDimColCount(dimColCount);
     parameters.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
     parameters.setNoDictionaryCount(noDictionaryCount);
     parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c4e79fc/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index e9ed6f3..ef92bbc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -18,9 +18,12 @@
 package org.apache.carbondata.processing.sort.sortdata;
 
 import java.io.Serializable;
+import java.util.List;
 import java.util.Objects;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 
 /**
  * This class is used to hold field information for a table during data loading. These information
@@ -103,34 +106,37 @@ public class TableFieldStat implements Serializable {
     int tmpDictSortCnt = 0;
     int tmpDictNoSortCnt = 0;
     int tmpVarcharCnt = 0;
+    int tmpComplexcount = 0;
 
-    for (int i = 0; i < isDimNoDictFlags.length; i++) {
-      if (isDimNoDictFlags[i]) {
+    List<CarbonDimension> allDimensions =
+        sortParameters.getCarbonTable().getDimensionByTableName(sortParameters.getTableName());
+
+    for (int i = 0; i < allDimensions.size(); i++) {
+      CarbonDimension carbonDimension = allDimensions.get(i);
+      if (carbonDimension.hasEncoding(Encoding.DICTIONARY) && !carbonDimension.isComplex()) {
+        if (carbonDimension.isSortColumn()) {
+          dictSortDimIdx[tmpDictSortCnt++] = i;
+        } else {
+          dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
+        }
+      } else if (!carbonDimension.isComplex()) {
         if (isVarcharDimFlags[i]) {
           varcharDimIdx[tmpVarcharCnt++] = i;
-        } else if (sortColumn[i]) {
+        } else if (carbonDimension.isSortColumn()) {
           noDictSortDimIdx[tmpNoDictSortCnt++] = i;
         } else {
           noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
         }
       } else {
-        if (sortColumn[i]) {
-          dictSortDimIdx[tmpDictSortCnt++] = i;
-        } else {
-          dictNoSortDimIdx[tmpDictNoSortCnt++] = i;
-        }
+        complexDimIdx[tmpComplexcount++] = i;
       }
     }
+
     dictNoSortDimCnt = tmpDictNoSortCnt;
     noDictNoSortDimCnt = tmpNoDictNoSortCnt;
 
-    int base = isDimNoDictFlags.length;
-    // indices for complex dimension columns
-    for (int i = 0; i < complexDimCnt; i++) {
-      complexDimIdx[i] = base + i;
-    }
+    int base = allDimensions.size();
 
-    base += complexDimCnt;
     // indices for measure columns
     for (int i = 0; i < measureCnt; i++) {
       measureIdx[i] = base + i;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c4e79fc/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index c60da45..c0fab18 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -215,7 +215,9 @@ public class CarbonFactDataHandlerModel {
     SegmentProperties segmentProperties =
         new SegmentProperties(wrapperColumnSchema, colCardinality);
 
-    int[] dimLens = configuration.calcDimensionLengths();
+    int[] dimLens = CarbonDataProcessorUtil
+        .calcDimensionLengths(configuration.getNumberOfSortColumns(),
+            configuration.getCardinalityForComplexDimension());
 
     int dimensionCount = configuration.getDimensionCount();
     int noDictionaryCount = configuration.getNoDictionaryCount();
@@ -273,8 +275,9 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
-    carbonFactDataHandlerModel.setComplexDimensionKeyGenerator(
-        configuration.createKeyGeneratorForComplexDimension());
+    carbonFactDataHandlerModel.setComplexDimensionKeyGenerator(CarbonDataProcessorUtil
+        .createKeyGeneratorForComplexDimension(configuration.getNumberOfSortColumns(),
+            configuration.getCardinalityForComplexDimension()));
     carbonFactDataHandlerModel.bucketId = bucketId;
     carbonFactDataHandlerModel.segmentId = configuration.getSegmentId();
     carbonFactDataHandlerModel.taskExtension = taskExtension;
@@ -356,9 +359,20 @@ public class CarbonFactDataHandlerModel {
         .getColumnSchemaList(carbonTable.getDimensionByTableName(tableName),
             carbonTable.getMeasureByTableName(tableName));
     carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
-    // get the cardinality for all all the columns including no dictionary columns
-    int[] formattedCardinality = CarbonUtil
-        .getFormattedCardinality(segmentProperties.getDimColumnsCardinality(), wrapperColumnSchema);
+    // get the cardinality for all all the columns including no
+    // dictionary columns and complex columns
+    int[] dimAndComplexColumnCardinality =
+        new int[segmentProperties.getDimColumnsCardinality().length + segmentProperties
+            .getComplexDimColumnCardinality().length];
+    for (int i = 0; i < segmentProperties.getDimColumnsCardinality().length; i++) {
+      dimAndComplexColumnCardinality[i] = segmentProperties.getDimColumnsCardinality()[i];
+    }
+    for (int i = 0; i < segmentProperties.getComplexDimColumnCardinality().length; i++) {
+      dimAndComplexColumnCardinality[segmentProperties.getDimColumnsCardinality().length + i] =
+          segmentProperties.getComplexDimColumnCardinality()[i];
+    }
+    int[] formattedCardinality =
+        CarbonUtil.getFormattedCardinality(dimAndComplexColumnCardinality, wrapperColumnSchema);
     carbonFactDataHandlerModel.setColCardinality(formattedCardinality);
 
     carbonFactDataHandlerModel.setComplexIndexMap(
@@ -376,6 +390,9 @@ public class CarbonFactDataHandlerModel {
     carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
     carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
     carbonFactDataHandlerModel.setColumnCompressor(loadModel.getColumnCompressor());
+    carbonFactDataHandlerModel.setComplexDimensionKeyGenerator(CarbonDataProcessorUtil
+        .createKeyGeneratorForComplexDimension(carbonTable.getNumberOfSortColumns(),
+            segmentProperties.getComplexDimColumnCardinality()));
 
     carbonFactDataHandlerModel.tableSpec = new TableSpec(carbonTable);
     DataMapWriterListener listener = new DataMapWriterListener();
@@ -419,7 +436,7 @@ public class CarbonFactDataHandlerModel {
 
   private static Map<Integer, GenericDataType> getComplexMap(String isNullFormat,
       int simpleDimsCount, DataField[] dataFields) {
-    int surrIndex = simpleDimsCount;
+    int surrIndex = 0;
     Iterator<Map.Entry<String, GenericDataType>> complexMap =
         CarbonDataProcessorUtil.getComplexTypesMap(dataFields, isNullFormat).entrySet().iterator();
     Map<Integer, GenericDataType> complexIndexMap = new HashMap<>(dataFields.length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c4e79fc/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandlerFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandlerFactory.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandlerFactory.java
index 79a8a86..f833859 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandlerFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactHandlerFactory.java
@@ -25,7 +25,6 @@ public final class CarbonFactHandlerFactory {
   /**
    * Creating fact handler to write data.
    * @param model
-   * @param handlerType
    * @return
    */
   public static CarbonFactHandler createCarbonFactHandler(CarbonFactDataHandlerModel model) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c4e79fc/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 1cff96d..9ce96d4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,6 +34,8 @@ import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.SortScopeOptions;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -699,4 +701,35 @@ public final class CarbonDataProcessorUtil {
     return iterators;
   }
 
+  public static int[] calcDimensionLengths(int numberOfSortColumns, int[] complexCardinality) {
+    // For no sort we are having the cardinality as MAX_VALUE for any non zero value
+    if (numberOfSortColumns == 0) {
+      for (int i = 0; i < complexCardinality.length; i++) {
+        if (complexCardinality[i] != 0) {
+          complexCardinality[i] = Integer.MAX_VALUE;
+        }
+      }
+    }
+    List<Integer> dimsLenList = new ArrayList<Integer>();
+    for (int eachDimLen : complexCardinality) {
+      if (eachDimLen != 0) dimsLenList.add(eachDimLen);
+    }
+    int[] dimLens = new int[dimsLenList.size()];
+    for (int i = 0; i < dimsLenList.size(); i++) {
+      dimLens[i] = dimsLenList.get(i);
+    }
+    return dimLens;
+  }
+
+  // This will give us KeyGenerators for all the children inside the complex datatype
+  public static KeyGenerator[] createKeyGeneratorForComplexDimension(int numberOfSortColumns,
+      int[] complexCardinality) {
+    int[] dimLens = calcDimensionLengths(numberOfSortColumns, complexCardinality);
+    KeyGenerator[] complexKeyGenerators = new KeyGenerator[dimLens.length];
+    for (int i = 0; i < dimLens.length; i++) {
+      complexKeyGenerators[i] = KeyGeneratorFactory.getKeyGenerator(new int[] { dimLens[i] });
+    }
+    return complexKeyGenerators;
+  }
+
 }