You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/17 13:35:42 UTC

carbondata git commit: [CARBONDATA-2477]Fixed No dictionary Complex type with double/date/decimal data type

Repository: carbondata
Updated Branches:
  refs/heads/master cf1b50bcc -> 6297ea0b4


[CARBONDATA-2477]Fixed No dictionary Complex type with double/date/decimal data type

Problem: SDK create table with No Dictionary complex type is failing when complex type child contain double/date/decimal data type
Solution: In complex type validation , it is not allowing double/date/decimal data , need to remove the same
Changed no dictionary complex type storage format, instead of storing length in int , now storing in short to reduce storage space

This closes #2304


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6297ea0b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6297ea0b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6297ea0b

Branch: refs/heads/master
Commit: 6297ea0b4092539fa0aa2c6f772d6984850c6110
Parents: cf1b50b
Author: kumarvishal09 <ku...@gmail.com>
Authored: Mon May 14 14:17:38 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu May 17 19:05:30 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datastore/ColumnType.java   | 14 ++-
 .../core/datastore/page/ColumnPage.java         | 82 ++++++++++++++++--
 .../core/datastore/page/ComplexColumnPage.java  | 16 +++-
 .../core/datastore/page/LazyColumnPage.java     | 13 ++-
 .../datastore/page/SafeFixLengthColumnPage.java | 25 +++++-
 .../datastore/page/SafeVarLengthColumnPage.java | 21 +++++
 .../page/UnsafeFixLengthColumnPage.java         | 39 ++++++++-
 .../datastore/page/VarLengthColumnPageBase.java | 90 ++++++++++++++++++--
 .../page/encoding/ColumnPageEncoder.java        |  9 +-
 .../scan/complextypes/PrimitiveQueryType.java   |  4 +-
 .../core/scan/complextypes/StructQueryType.java |  8 +-
 .../apache/carbondata/core/util/ByteUtil.java   |  9 ++
 ...ransactionalCarbonTableWithComplexType.scala | 76 ++++++++++++++++-
 .../processing/datatypes/ArrayDataType.java     |  7 ++
 .../processing/datatypes/GenericDataType.java   |  4 +
 .../processing/datatypes/PrimitiveDataType.java | 17 ++--
 .../processing/datatypes/StructDataType.java    | 30 +++----
 .../carbondata/processing/store/TablePage.java  |  6 +-
 .../sdk/file/CarbonWriterBuilder.java           |  9 --
 19 files changed, 407 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
index f98307b..8bbf12d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/ColumnType.java
@@ -31,7 +31,13 @@ public enum ColumnType {
   COMPLEX,
 
   // measure column, numerical data type
-  MEASURE;
+  MEASURE,
+
+  COMPLEX_STRUCT,
+
+  COMPLEX_ARRAY,
+
+  COMPLEX_PRIMITIVE;
 
   public static ColumnType valueOf(int ordinal) {
     if (ordinal == GLOBAL_DICTIONARY.ordinal()) {
@@ -44,6 +50,12 @@ public enum ColumnType {
       return COMPLEX;
     } else if (ordinal == MEASURE.ordinal()) {
       return MEASURE;
+    } else if (ordinal == COMPLEX_STRUCT.ordinal()) {
+      return COMPLEX_STRUCT;
+    } else if (ordinal == COMPLEX_ARRAY.ordinal()) {
+      return COMPLEX_ARRAY;
+    } else if (ordinal == COMPLEX_PRIMITIVE.ordinal()) {
+      return COMPLEX_PRIMITIVE;
     } else {
       throw new RuntimeException("create ColumnType with invalid ordinal: " + ordinal);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 68269fb..69ed437 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -22,6 +22,7 @@ import java.math.BigDecimal;
 import java.util.BitSet;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
@@ -153,6 +154,19 @@ public abstract class ColumnPage {
     }
   }
 
+  private static ColumnPage createFixLengthByteArrayPage(TableSpec.ColumnSpec columnSpec,
+      DataType dataType, int pageSize, int eachValueSize) {
+    if (unsafe) {
+      try {
+        return new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize);
+      } catch (MemoryException e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      return new SafeFixLengthColumnPage(columnSpec, dataType, pageSize, eachValueSize);
+    }
+  }
+
   private static ColumnPage createPage(TableSpec.ColumnSpec columnSpec, DataType dataType,
       int pageSize) {
     if (DataTypes.isDecimal(dataType)) {
@@ -281,8 +295,31 @@ public abstract class ColumnPage {
   }
 
   private static ColumnPage newLVBytesPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedByteArray) throws MemoryException {
-    return VarLengthColumnPageBase.newLVBytesColumnPage(columnSpec, lvEncodedByteArray);
+      byte[] lvEncodedByteArray, int lvLength) throws MemoryException {
+    return VarLengthColumnPageBase.newLVBytesColumnPage(columnSpec, lvEncodedByteArray, lvLength);
+  }
+
+  private static ColumnPage newComplexLVBytesPage(TableSpec.ColumnSpec columnSpec,
+      byte[] lvEncodedByteArray, int lvLength) throws MemoryException {
+    return VarLengthColumnPageBase
+        .newComplexLVBytesColumnPage(columnSpec, lvEncodedByteArray, lvLength);
+  }
+
+  private static ColumnPage newFixedByteArrayPage(TableSpec.ColumnSpec columnSpec,
+      byte[] lvEncodedByteArray, int eachValueSize) throws MemoryException {
+    int pageSize = lvEncodedByteArray.length / eachValueSize;
+    ColumnPage fixLengthByteArrayPage =
+        createFixLengthByteArrayPage(columnSpec, columnSpec.getSchemaDataType(), pageSize,
+            eachValueSize);
+    byte[] data = null;
+    int offset = 0;
+    for (int i = 0; i < pageSize; i++) {
+      data = new byte[eachValueSize];
+      System.arraycopy(lvEncodedByteArray, offset, data, 0, eachValueSize);
+      fixLengthByteArrayPage.putBytes(i, data);
+      offset += eachValueSize;
+    }
+    return fixLengthByteArrayPage;
   }
 
   /**
@@ -607,6 +644,20 @@ public abstract class ColumnPage {
   public abstract byte[] getLVFlattenedBytePage() throws IOException;
 
   /**
+   * For complex type columns
+   * @return
+   * @throws IOException
+   */
+  public abstract byte[] getComplexChildrenLVFlattenedBytePage() throws IOException;
+
+  /**
+   * For complex type columns
+   * @return
+   * @throws IOException
+   */
+  public abstract byte[] getComplexParentFlattenedBytePage() throws IOException;
+
+  /**
    * For decimals
    */
   public abstract byte[] getDecimalPage();
@@ -638,6 +689,13 @@ public abstract class ColumnPage {
       return compressor.compressDouble(getDoublePage());
     } else if (DataTypes.isDecimal(dataType)) {
       return compressor.compressByte(getDecimalPage());
+    } else if (dataType == DataTypes.BYTE_ARRAY
+        && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
+      return compressor.compressByte(getComplexChildrenLVFlattenedBytePage());
+    } else if (dataType == DataTypes.BYTE_ARRAY && (
+        columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT
+            || columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY)) {
+      return compressor.compressByte(getComplexParentFlattenedBytePage());
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return compressor.compressByte(getLVFlattenedBytePage());
     } else {
@@ -676,12 +734,26 @@ public abstract class ColumnPage {
     } else if (storeDataType == DataTypes.DOUBLE) {
       double[] doubleData = compressor.unCompressDouble(compressedData, offset, length);
       return newDoublePage(columnSpec, doubleData);
+    } else if (storeDataType == DataTypes.BYTE_ARRAY
+        && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) {
+      byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
+      return newComplexLVBytesPage(columnSpec, lvVarBytes,
+          CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+    } else if (storeDataType == DataTypes.BYTE_ARRAY
+        && columnSpec.getColumnType() == ColumnType.COMPLEX_STRUCT) {
+      byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
+      return newFixedByteArrayPage(columnSpec, lvVarBytes,
+          CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+    } else if (storeDataType == DataTypes.BYTE_ARRAY
+        && columnSpec.getColumnType() == ColumnType.COMPLEX_ARRAY) {
+      byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
+      return newFixedByteArrayPage(columnSpec, lvVarBytes, CarbonCommonConstants.LONG_SIZE_IN_BYTE);
     } else if (storeDataType == DataTypes.BYTE_ARRAY) {
       byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length);
-      return newLVBytesPage(columnSpec, lvVarBytes);
+      return newLVBytesPage(columnSpec, lvVarBytes, CarbonCommonConstants.INT_SIZE_IN_BYTE);
     } else {
-      throw new UnsupportedOperationException("unsupport uncompress column page: " +
-          meta.getStoreDataType());
+      throw new UnsupportedOperationException(
+          "unsupport uncompress column page: " + meta.getStoreDataType());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index 8cb18e9..07dc837 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.ColumnType;
 
 // Represent a complex column page, e.g. Array, Struct type column
 public class ComplexColumnPage {
@@ -34,17 +35,20 @@ public class ComplexColumnPage {
   private List<ArrayList<byte[]>> complexColumnData;
 
   // depth is the number of column after complex type is expanded. It is from 1 to N
-  private final int depth;
-
   private final int pageSize;
 
-  public ComplexColumnPage(int pageSize, int depth) {
+  private int depth;
+
+  private List<ColumnType> complexColumnType;
+
+  public ComplexColumnPage(int pageSize, List<ColumnType> complexColumnType) {
     this.pageSize = pageSize;
-    this.depth = depth;
+    this.depth = complexColumnType.size();
     complexColumnData = new ArrayList<>(depth);
     for (int i = 0; i < depth; i++) {
       complexColumnData.add(new ArrayList<byte[]>());
     }
+    this.complexColumnType = complexColumnType;
   }
 
   public void putComplexData(int rowId, int depth, List<byte[]> value) {
@@ -79,4 +83,8 @@ public class ComplexColumnPage {
   public int getPageSize() {
     return pageSize;
   }
+
+  public ColumnType getComplexColumnType(int isDepth) {
+    return complexColumnType.get(isDepth);
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
index ce8aaae..255e078 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -171,7 +172,17 @@ public class LazyColumnPage extends ColumnPage {
   }
 
   @Override
-  public byte[] getLVFlattenedBytePage() {
+  public byte[] getLVFlattenedBytePage() throws IOException {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public byte[] getComplexChildrenLVFlattenedBytePage() {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public byte[] getComplexParentFlattenedBytePage() throws IOException {
     throw new UnsupportedOperationException("internal error");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
index 1e4445a..2304614 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.datastore.page;
 
+import java.io.IOException;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.TableSpec;
@@ -37,11 +38,19 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   private float[] floatData;
   private double[] doubleData;
   private byte[] shortIntData;
+  private byte[][] fixedLengthdata;
+
 
   SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize) {
     super(columnSpec, dataType, pageSize);
   }
 
+  SafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
+      int eachRowSize) {
+    super(columnSpec, dataType, pageSize);
+    this.fixedLengthdata = new byte[pageSize][];
+  }
+
   /**
    * Set byte value at rowId
    */
@@ -87,7 +96,7 @@ public class SafeFixLengthColumnPage extends ColumnPage {
    */
   @Override
   public void putBytes(int rowId, byte[] bytes) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    this.fixedLengthdata[rowId] = bytes;
   }
 
   @Override
@@ -173,7 +182,7 @@ public class SafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public byte[] getBytes(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    return this.fixedLengthdata[rowId];
   }
 
   /**
@@ -241,10 +250,20 @@ public class SafeFixLengthColumnPage extends ColumnPage {
   }
 
   @Override
-  public byte[] getLVFlattenedBytePage() {
+  public byte[] getLVFlattenedBytePage() throws IOException {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
 
+  @Override
+  public byte[] getComplexChildrenLVFlattenedBytePage() {
+    throw new UnsupportedOperationException("internal error");
+  }
+
+  @Override
+  public byte[] getComplexParentFlattenedBytePage() throws IOException {
+    throw new UnsupportedOperationException("internal error");
+  }
+
   /**
    * Set byte values to page
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
index 782b9dc..7b1ad20 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java
@@ -82,6 +82,27 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase {
   }
 
   @Override
+  public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(stream);
+    for (byte[] byteArrayDatum : byteArrayData) {
+      out.writeShort((short)byteArrayDatum.length);
+      out.write(byteArrayDatum);
+    }
+    return stream.toByteArray();
+  }
+
+  @Override
+  public byte[] getComplexParentFlattenedBytePage() throws IOException {
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(stream);
+    for (byte[] byteArrayDatum : byteArrayData) {
+      out.write(byteArrayDatum);
+    }
+    return stream.toByteArray();
+  }
+
+  @Override
   public byte[][] getByteArrayPage() {
     return byteArrayData;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
index c88dc0b..6847ab9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java
@@ -44,6 +44,8 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   // base offset of memoryBlock
   private long baseOffset;
 
+  private int eachRowSize;
+
   private final long taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
 
   private static final int byteBits = DataTypes.BYTE.getSizeBits();
@@ -77,6 +79,19 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
     }
   }
 
+  UnsafeFixLengthColumnPage(TableSpec.ColumnSpec columnSpec, DataType dataType, int pageSize,
+      int eachRowSize)
+      throws MemoryException {
+    this(columnSpec, dataType, pageSize);
+    this.eachRowSize = eachRowSize;
+    if (dataType == DataTypes.BYTE_ARRAY) {
+      memoryBlock =
+          UnsafeMemoryManager.allocateMemoryWithRetry(taskId, (long) pageSize * eachRowSize);
+      baseAddress = memoryBlock.getBaseObject();
+      baseOffset = memoryBlock.getBaseOffset();
+    }
+  }
+
   @Override
   public void putByte(int rowId, byte value) {
     long offset = rowId << byteBits;
@@ -118,7 +133,11 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public void putBytes(int rowId, byte[] bytes) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    // copy the data to memory
+    long offset = (long)rowId * eachRowSize;
+    CarbonUnsafe.getUnsafe()
+        .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+            baseOffset + offset, bytes.length);
   }
 
   @Override
@@ -183,7 +202,14 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
 
   @Override
   public byte[] getBytes(int rowId) {
-    throw new UnsupportedOperationException("invalid data type: " + dataType);
+    // creating a row
+    byte[] data = new byte[eachRowSize];
+    //copy the row from memory block based on offset
+    // offset position will be index * each column value length
+    CarbonUnsafe.getUnsafe().copyMemory(memoryBlock.getBaseObject(),
+        baseOffset + ((long)rowId * eachRowSize), data,
+        CarbonUnsafe.BYTE_ARRAY_OFFSET, eachRowSize);
+    return data;
   }
 
   @Override public byte[] getDecimalPage() {
@@ -267,6 +293,15 @@ public class UnsafeFixLengthColumnPage extends ColumnPage {
   public byte[] getLVFlattenedBytePage() {
     throw new UnsupportedOperationException("invalid data type: " + dataType);
   }
+  @Override
+  public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+    throw new UnsupportedOperationException("invalid data type: " + dataType);
+  }
+
+  @Override
+  public byte[] getComplexParentFlattenedBytePage() {
+    throw new UnsupportedOperationException("internal error");
+  }
 
   @Override
   public void setBytePage(byte[] byteData) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index c6062c1..901758a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
 import org.apache.carbondata.core.memory.MemoryBlock;
@@ -115,7 +116,8 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     int size = decimalConverter.getSize();
     if (size < 0) {
       return getLVBytesColumnPage(columnSpec, lvEncodedBytes,
-          DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()));
+          DataTypes.createDecimalType(columnSpec.getPrecision(), columnSpec.getScale()),
+          CarbonCommonConstants.INT_SIZE_IN_BYTE);
     } else {
       // Here the size is always fixed.
       return getDecimalColumnPage(columnSpec, lvEncodedBytes, size);
@@ -125,9 +127,17 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   /**
    * Create a new column page based on the LV (Length Value) encoded bytes
    */
-  static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes)
-      throws MemoryException {
-    return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY);
+  static ColumnPage newLVBytesColumnPage(TableSpec.ColumnSpec columnSpec, byte[] lvEncodedBytes,
+      int lvLength) throws MemoryException {
+    return getLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
+  }
+
+  /**
+   * Create a new column page based on the LV (Length Value) encoded bytes
+   */
+  static ColumnPage newComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
+      byte[] lvEncodedBytes, int lvLength) throws MemoryException {
+    return getComplexLVBytesColumnPage(columnSpec, lvEncodedBytes, DataTypes.BYTE_ARRAY, lvLength);
   }
 
   private static ColumnPage getDecimalColumnPage(TableSpec.ColumnSpec columnSpec,
@@ -161,7 +171,7 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
   }
 
   private static ColumnPage getLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
-      byte[] lvEncodedBytes, DataType dataType)
+      byte[] lvEncodedBytes, DataType dataType, int lvLength)
       throws MemoryException {
     // extract length and data, set them to rowOffset and unsafe memory correspondingly
     int rowId = 0;
@@ -176,11 +186,48 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
       length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset);
       rowOffset.add(offset);
       rowLength.add(length);
-      lvEncodedOffset += 4 + length;
+      lvEncodedOffset += lvLength + length;
+      rowId++;
+    }
+    rowOffset.add(offset);
+    VarLengthColumnPageBase page =
+        getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset,
+            rowLength, offset);
+    return page;
+  }
+
+  private static ColumnPage getComplexLVBytesColumnPage(TableSpec.ColumnSpec columnSpec,
+      byte[] lvEncodedBytes, DataType dataType, int lvLength)
+      throws MemoryException {
+    // extract length and data, set them to rowOffset and unsafe memory correspondingly
+    int rowId = 0;
+    List<Integer> rowOffset = new ArrayList<>();
+    List<Integer> rowLength = new ArrayList<>();
+    int length;
+    int offset;
+    int lvEncodedOffset = 0;
+
+    // extract Length field in input and calculate total length
+    for (offset = 0; lvEncodedOffset < lvEncodedBytes.length; offset += length) {
+      length = ByteUtil.toShort(lvEncodedBytes, lvEncodedOffset);
+      rowOffset.add(offset);
+      rowLength.add(length);
+      lvEncodedOffset += lvLength + length;
       rowId++;
     }
     rowOffset.add(offset);
 
+    VarLengthColumnPageBase page =
+        getVarLengthColumnPage(columnSpec, lvEncodedBytes, dataType, lvLength, rowId, rowOffset,
+            rowLength, offset);
+    return page;
+  }
+
+  private static VarLengthColumnPageBase getVarLengthColumnPage(TableSpec.ColumnSpec columnSpec,
+      byte[] lvEncodedBytes, DataType dataType, int lvLength, int rowId, List<Integer> rowOffset,
+      List<Integer> rowLength, int offset) throws MemoryException {
+    int lvEncodedOffset;
+    int length;
     int numRows = rowId;
 
     VarLengthColumnPageBase page;
@@ -202,10 +249,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     lvEncodedOffset = 0;
     for (int i = 0; i < numRows; i++) {
       length = rowLength.get(i);
-      page.putBytes(i, lvEncodedBytes, lvEncodedOffset + 4, length);
-      lvEncodedOffset += 4 + length;
+      page.putBytes(i, lvEncodedBytes, lvEncodedOffset + lvLength, length);
+      lvEncodedOffset += lvLength + length;
     }
-
     return page;
   }
 
@@ -353,6 +399,32 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
     return data;
   }
 
+  @Override public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException {
+    // output LV encoded byte array
+    int offset = 0;
+    byte[] data = new byte[totalLength + pageSize * 2];
+    for (int rowId = 0; rowId < pageSize; rowId++) {
+      short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]);
+      ByteUtil.setShort(data, offset, length);
+      copyBytes(rowId, data, offset + 2, length);
+      offset += 2 + length;
+    }
+    return data;
+  }
+
+  @Override
+  public byte[] getComplexParentFlattenedBytePage() throws IOException {
+    // output LV encoded byte array
+    int offset = 0;
+    byte[] data = new byte[totalLength];
+    for (int rowId = 0; rowId < pageSize; rowId++) {
+      short length = (short) (rowOffset[rowId + 1] - rowOffset[rowId]);
+      copyBytes(rowId, data, offset, length);
+      offset += length;
+    }
+    return data;
+  }
+
   @Override
   public void convertValue(ColumnPageValueConverter codec) {
     throw new UnsupportedOperationException("invalid data type: " + dataType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
index dfdca02..8bff5cc 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java
@@ -141,15 +141,16 @@ public abstract class ColumnPageEncoder {
     Iterator<byte[][]> iterator = input.iterator();
     while (iterator.hasNext()) {
       byte[][] subColumnPage = iterator.next();
-      encodedPages[index++] = encodeChildColumn(subColumnPage);
+      encodedPages[index] = encodeChildColumn(subColumnPage, input.getComplexColumnType(index));
+      index++;
     }
     return encodedPages;
   }
 
-  private static EncodedColumnPage encodeChildColumn(byte[][] data)
+  private static EncodedColumnPage encodeChildColumn(byte[][] data, ColumnType complexDataType)
       throws IOException, MemoryException {
-    TableSpec.ColumnSpec spec = TableSpec.ColumnSpec.newInstance("complex_inner_column",
-        DataTypes.BYTE_ARRAY, ColumnType.COMPLEX);
+    TableSpec.ColumnSpec spec = TableSpec.ColumnSpec
+        .newInstance("complex_inner_column", DataTypes.BYTE_ARRAY, complexDataType);
     ColumnPage page = ColumnPage.wrapByteArrayPage(spec, data);
     ColumnPageEncoder encoder = new DirectCompressCodec(DataTypes.BYTE_ARRAY).createEncoder(null);
     return encoder.encode(page);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index edae4da..899957e 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -96,7 +96,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
       int pageNumber, DataOutputStream dataOutputStream) throws IOException {
     byte[] currentVal = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
     if (!this.isDictionary) {
-      dataOutputStream.writeInt(currentVal.length);
+      dataOutputStream.writeShort(currentVal.length);
     }
     dataOutputStream.write(currentVal);
   }
@@ -120,7 +120,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
       actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
     } else if (!isDictionary) {
       // No Dictionary Columns
-      int size = dataBuffer.getInt();
+      int size = dataBuffer.getShort();
       byte[] value = new byte[size];
       dataBuffer.get(value, 0, size);
       if (dataType == DataTypes.DATE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index 9ff8252..301eb5a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -81,8 +81,8 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
       int pageNumber, DataOutputStream dataOutputStream) throws IOException {
     byte[] input = copyBlockDataChunk(dimensionColumnDataChunks, rowNumber, pageNumber);
     ByteBuffer byteArray = ByteBuffer.wrap(input);
-    int childElement = byteArray.getInt();
-    dataOutputStream.writeInt(childElement);
+    int childElement = byteArray.getShort();
+    dataOutputStream.writeShort(childElement);
     if (childElement > 0) {
       for (int i = 0; i < childElement; i++) {
         children.get(i)
@@ -102,13 +102,11 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
   }
 
   @Override public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
-    int childLength = dataBuffer.getInt();
+    int childLength = dataBuffer.getShort();
     Object[] fields = new Object[childLength];
     for (int i = 0; i < childLength; i++) {
       fields[i] =  children.get(i).getDataBasedOnDataType(dataBuffer);
     }
-
     return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index 661384c..1df60c1 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -521,6 +521,10 @@ public final class ByteUtil {
         (((int)bytes[offset + 2] & 0xff) << 8) + ((int)bytes[offset + 3] & 0xff);
   }
 
+  public static int toShort(byte[] bytes, int offset) {
+    return (((int)bytes[offset] & 0xff) << 8) + ((int)bytes[offset + 1] & 0xff);
+  }
+
   public static void setInt(byte[] data, int offset, int value) {
     data[offset] = (byte) (value >> 24);
     data[offset + 1] = (byte) (value >> 16);
@@ -528,6 +532,11 @@ public final class ByteUtil {
     data[offset + 3] = (byte) value;
   }
 
+  public static void setShort(byte[] data, int offset, int value) {
+    data[offset] = (byte) (value >> 8);
+    data[offset + 1] = (byte) value;
+  }
+
   /**
    * long => byte[]
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
index ccfb231..7f9023b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTableWithComplexType.scala
@@ -17,9 +17,11 @@
 
 package org.apache.carbondata.spark.testsuite.createTable
 
-import java.io.{File}
+import java.io.File
 import java.util
+import java.util.ArrayList
 
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.avro
 import org.apache.commons.io.FileUtils
@@ -28,6 +30,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
 import tech.allegro.schema.json2avro.converter.JsonAvroConverter
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -195,8 +198,8 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
 
     val fld = new util.ArrayList[StructField]
     fld.add(new StructField("DoorNum",
-        DataTypes.createArrayType(DataTypes.createStructType(address)),
-        subFld))
+      DataTypes.createArrayType(DataTypes.createStructType(address)),
+      subFld))
     // array of struct of struct
     val doorNum = new util.ArrayList[StructField]
     doorNum.add(new StructField("FloorNum",
@@ -229,4 +232,71 @@ class TestNonTransactionalCarbonTableWithComplexType extends QueryTest with Befo
     // drop table should not delete the files
     cleanTestData()
   }
+
+  test("test multi level support : array of array of array of with Double data type") {
+    cleanTestData()
+    val mySchema =  """ {
+                      |	"name": "address",
+                      |	"type": "record",
+                      |	"fields": [
+                      |		{
+                      |			"name": "name",
+                      |			"type": "string"
+                      |		},
+                      |		{
+                      |			"name": "age",
+                      |			"type": "int"
+                      |		},
+                      |		{
+                      |   "name" :"my_address",
+                      |   "type" :{
+                      |							"name": "my_address",
+                      |							"type": "record",
+                      |							"fields": [
+                      |               {
+                      |									"name": "Temperaturetest",
+                      |									"type": "double"
+                      |								}
+                      |							]
+                      |       }
+                      |			}
+                      |	]
+                      |} """.stripMargin
+
+    val jsonvalue=
+      """{
+        |"name" :"babu",
+        |"age" :12,
+        |"my_address" :{ "Temperaturetest" :123 }
+        |}
+      """.stripMargin
+    val pschema= org.apache.avro.Schema.parse(mySchema)
+
+    val records=new JsonAvroConverter().convertToGenericDataRecord(jsonvalue.getBytes(CharEncoding.UTF_8),pschema)
+
+    val fieds = new Array[Field](3)
+    fieds(0)=new Field("name",DataTypes.STRING);
+    fieds(1)=new Field("age",DataTypes.INT)
+
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("Temperature", DataTypes.DOUBLE))
+    fieds(2) = new Field("my_address", "struct", fld)
+
+
+    val writer=CarbonWriter.builder().withSchema(new Schema(fieds)).outputPath(writerPath).buildWriterForAvroInput()
+    writer.write(records)
+    writer.close()
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    // TODO: Add a validation
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    cleanTestData()
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index cc2619e..4ce80a6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -286,4 +287,10 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
   public GenericDataType<ArrayObject> deepCopy() {
     return new ArrayDataType(this.outputArrayIndex, this.dataCounter, this.children.deepCopy());
   }
+
+  @Override
+  public void getChildrenType(List<ColumnType> type) {
+    type.add(ColumnType.COMPLEX_ARRAY);
+    children.getChildrenType(type);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index f48a91d..8b1ccf2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -155,4 +156,7 @@ public interface GenericDataType<T> {
    * clone self for multithread access (for complex type processing in table page)
    */
   GenericDataType<T> deepCopy();
+
+  void getChildrenType(List<ColumnType> type);
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 481c811..7450b82 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.dictionary.client.DictionaryClient;
@@ -362,17 +363,17 @@ public class PrimitiveDataType implements GenericDataType<Object> {
 
   private void updateValueToByteStream(DataOutputStream dataOutputStream, byte[] value)
       throws IOException {
-    dataOutputStream.writeInt(value.length);
+    dataOutputStream.writeShort(value.length);
     dataOutputStream.write(value);
   }
 
   private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder)
       throws IOException {
     if (this.carbonDimension.getDataType() == DataTypes.STRING) {
-      dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
+      dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length);
       dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
     } else {
-      dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
+      dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length);
       dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
     }
     String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
@@ -396,8 +397,8 @@ public class PrimitiveDataType implements GenericDataType<Object> {
       KeyGenerator[] generator)
       throws IOException, KeyGenException {
     if (!this.isDictionary) {
-      int sizeOfData = byteArrayInput.getInt();
-      dataOutputStream.writeInt(sizeOfData);
+      int sizeOfData = byteArrayInput.getShort();
+      dataOutputStream.writeShort(sizeOfData);
       byte[] bb = new byte[sizeOfData];
       byteArrayInput.get(bb, 0, sizeOfData);
       dataOutputStream.write(bb);
@@ -438,7 +439,7 @@ public class PrimitiveDataType implements GenericDataType<Object> {
   @Override public void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray,
       ByteBuffer inputArray) {
     if (!isDictionary) {
-      byte[] key = new byte[inputArray.getInt()];
+      byte[] key = new byte[inputArray.getShort()];
       inputArray.get(key);
       columnsArray.get(outputArrayIndex).add(key);
     } else {
@@ -506,4 +507,8 @@ public class PrimitiveDataType implements GenericDataType<Object> {
 
     return dataType;
   }
+
+  public void getChildrenType(List<ColumnType> type) {
+    type.add(ColumnType.COMPLEX_PRIMITIVE);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index bb3da6c..b66eef7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
@@ -153,7 +154,7 @@ public class StructDataType implements GenericDataType<StructObject> {
 
   @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream,
       BadRecordLogHolder logHolder) throws IOException, DictionaryGenerationException {
-    dataOutputStream.writeInt(children.size());
+    dataOutputStream.writeShort(children.size());
     if (input == null) {
       for (int i = 0; i < children.size(); i++) {
         children.get(i).writeByteArray(null, dataOutputStream, logHolder);
@@ -191,9 +192,8 @@ public class StructDataType implements GenericDataType<StructObject> {
   @Override public void parseComplexValue(ByteBuffer byteArrayInput,
       DataOutputStream dataOutputStream, KeyGenerator[] generator)
       throws IOException, KeyGenException {
-    int childElement = byteArrayInput.getInt();
-    dataOutputStream.writeInt(childElement);
-
+    short childElement = byteArrayInput.getShort();
+    dataOutputStream.writeShort(childElement);
     for (int i = 0; i < childElement; i++) {
       if (children.get(i) instanceof PrimitiveDataType) {
         if (children.get(i).getIsColumnDictionary()) {
@@ -254,17 +254,10 @@ public class StructDataType implements GenericDataType<StructObject> {
   @Override
   public void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray,
       ByteBuffer inputArray) {
-
-    ByteBuffer b = ByteBuffer.allocate(8);
-    int childElement = inputArray.getInt();
-    b.putInt(childElement);
-    if (childElement == 0) {
-      b.putInt(0);
-    } else {
-      b.putInt(children.get(0).getDataCounter());
-    }
+    ByteBuffer b = ByteBuffer.allocate(2);
+    int childElement = inputArray.getShort();
+    b.putShort((short)childElement);
     columnsArray.get(this.outputArrayIndex).add(b.array());
-
     for (int i = 0; i < childElement; i++) {
       if (children.get(i) instanceof PrimitiveDataType) {
         PrimitiveDataType child = ((PrimitiveDataType) children.get(i));
@@ -301,7 +294,7 @@ public class StructDataType implements GenericDataType<StructObject> {
    */
   @Override
   public void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize) {
-    blockKeySizeWithComplex.add(8);
+    blockKeySizeWithComplex.add(2);
     for (int i = 0; i < children.size(); i++) {
       children.get(i).fillBlockKeySize(blockKeySizeWithComplex, primitiveBlockKeySize);
     }
@@ -327,4 +320,11 @@ public class StructDataType implements GenericDataType<StructObject> {
     }
     return new StructDataType(childrenClone, this.outputArrayIndex, this.dataCounter);
   }
+
+  public void getChildrenType(List<ColumnType> type) {
+    type.add(ColumnType.COMPLEX_STRUCT);
+    for (int i = 0; i < children.size(); i++) {
+      children.get(i).getChildrenType(type);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 26a634b..5408193 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -50,7 +50,6 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 
-
 /**
  * Represent a page data for all columns, we store its data in columnar layout, so that
  * all processing apply to TablePage can be done in vectorized fashion.
@@ -205,8 +204,9 @@ public class TablePage {
 
     // initialize the page if first row
     if (rowId == 0) {
-      int depthInComplexColumn = complexDataType.getColsCount();
-      complexDimensionPages[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
+      List<ColumnType> complexColumnType = new ArrayList<>();
+      complexDataType.getChildrenType(complexColumnType);
+      complexDimensionPages[index] = new ComplexColumnPage(pageSize, complexColumnType);
     }
 
     int depthInComplexColumn = complexDimensionPages[index].getDepth();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6297ea0b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index a01f0d7..36be65f 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -459,7 +459,6 @@ public class CarbonWriterBuilder {
         }
         if (field.getChildren() != null && field.getChildren().size() > 0) {
           if (field.getDataType().getName().equalsIgnoreCase("ARRAY")) {
-            checkForUnsupportedDataTypes(field.getChildren().get(0).getDataType());
             // Loop through the inner columns and for a StructData
             DataType complexType =
                 DataTypes.createArrayType(field.getChildren().get(0).getDataType());
@@ -470,7 +469,6 @@ public class CarbonWriterBuilder {
             List<StructField> structFieldsArray =
                 new ArrayList<StructField>(field.getChildren().size());
             for (StructField childFld : field.getChildren()) {
-              checkForUnsupportedDataTypes(childFld.getDataType());
               structFieldsArray
                   .add(new StructField(childFld.getFieldName(), childFld.getDataType()));
             }
@@ -496,13 +494,6 @@ public class CarbonWriterBuilder {
     }
   }
 
-  private void checkForUnsupportedDataTypes(DataType dataType) {
-    if (dataType == DataTypes.DOUBLE || dataType == DataTypes.DATE || DataTypes
-        .isDecimal(dataType)) {
-      throw new RuntimeException("Unsupported data type: " + dataType.getName());
-    }
-  }
-
   /**
    * Save the schema of the {@param table} to {@param persistFilePath}
    * @param table table object containing schema