You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/01 17:59:07 UTC

[2/2] carbondata git commit: [CARBONDATA-2388][SDK]Avro Record Complex Type Implementation

[CARBONDATA-2388][SDK]Avro Record Complex Type Implementation

Avro Complex DataType Support.

AVRO Complex type. Supported Datatype - ARRAYS and RECORDS.

Carbon Supported DataType - ARRAYS and STRUCT

SDK support to handle complex datatype.

Carbon Complex Type Support
- Support for NonDictionary Fields.
- Existing Complex type bug fixes.

This closes #2209


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3202cf51
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3202cf51
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3202cf51

Branch: refs/heads/master
Commit: 3202cf517ab1a6805deab27baeb5f9d44094ee87
Parents: 7edef8f
Author: sounakr <so...@gmail.com>
Authored: Mon Apr 23 10:18:10 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue May 1 23:28:53 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/datatype/DataTypes.java       |   6 +-
 .../core/metadata/schema/SchemaReader.java      |   4 +-
 .../schema/table/TableSchemaBuilder.java        |  17 +-
 .../schema/table/column/CarbonColumn.java       |  39 ++++
 .../impl/DictionaryBasedResultCollector.java    |   6 +-
 .../core/scan/complextypes/ArrayQueryType.java  |   6 +-
 .../scan/complextypes/PrimitiveQueryType.java   |  38 +++-
 .../core/scan/complextypes/StructQueryType.java |   7 +-
 .../core/scan/executor/util/QueryUtil.java      |   4 +
 .../core/scan/filter/GenericQueryType.java      |   3 +-
 .../executer/RowLevelFilterExecuterImpl.java    |   2 +-
 .../core/scan/result/BlockletScannedResult.java |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   1 +
 .../scan/complextypes/ArrayQueryTypeTest.java   |   4 +-
 .../complextypes/PrimitiveQueryTypeTest.java    |   6 +-
 .../scan/complextypes/StructQueryTypeTest.java  |   2 +-
 integration/spark-common-test/pom.xml           |   6 +
 .../TestNonTransactionalCarbonTable.scala       | 100 ++++++++++-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  28 ++-
 .../apache/spark/util/SparkTypeConverter.scala  |  58 ++++--
 .../processing/datatypes/ArrayDataType.java     |  19 +-
 .../processing/datatypes/GenericDataType.java   |  14 +-
 .../processing/datatypes/PrimitiveDataType.java | 179 ++++++++++++++++---
 .../processing/datatypes/StructDataType.java    |  29 ++-
 .../loading/CarbonDataLoadConfiguration.java    |  25 ++-
 .../processing/loading/DataField.java           |   1 +
 .../loading/DataLoadProcessBuilder.java         |   2 +
 .../converter/impl/FieldEncoderFactory.java     |  25 +--
 .../CarbonRowDataWriterProcessorStepImpl.java   |   3 +-
 .../sort/sortdata/SortParameters.java           |   6 +-
 .../store/CarbonFactDataHandlerModel.java       |   8 +-
 .../carbondata/processing/store/TablePage.java  |   4 +-
 .../util/CarbonDataProcessorUtil.java           |  28 ++-
 .../carbondata/sdk/file/AvroCarbonWriter.java   |  27 ++-
 .../sdk/file/CarbonWriterBuilder.java           |  18 +-
 .../org/apache/carbondata/sdk/file/Field.java   | 122 ++++++++++++-
 .../sdk/file/AvroCarbonWriterTest.java          | 107 ++++++++++-
 .../streaming/CarbonStreamInputFormat.java      |   3 +
 .../streaming/CarbonStreamRecordReader.java     |   2 +-
 39 files changed, 816 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
index ad21eaa..dc89a41 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
@@ -63,9 +63,9 @@ public class DataTypes {
   static final int SHORT_INT_TYPE_ID = 16;
   static final int LEGACY_LONG_TYPE_ID = 17;
   static final int DECIMAL_TYPE_ID = 10;
-  static final int ARRAY_TYPE_ID = 11;
-  static final int STRUCT_TYPE_ID = 12;
-  static final int MAP_TYPE_ID = 13;
+  public static final int ARRAY_TYPE_ID = 11;
+  public static final int STRUCT_TYPE_ID = 12;
+  public static final int MAP_TYPE_ID = 13;
 
   /**
    * create a DataType instance from uniqueId of the DataType

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index be3906b..57370f6 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -82,9 +82,7 @@ public class SchemaReader {
 
   public static TableInfo inferSchema(AbsoluteTableIdentifier identifier,
       boolean isCarbonFileProvider) throws IOException {
-    // This routine is going to infer schema from the carbondata file footer
-    // Convert the ColumnSchema -> TableSchema -> TableInfo.
-    // Return the TableInfo.
+
     org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil
         .inferSchema(identifier.getTablePath(), identifier.getTableName(), isCarbonFileProvider);
     SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index 617d58f..2c29be0 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.DecimalType;
 import org.apache.carbondata.core.metadata.datatype.StructField;
+import org.apache.carbondata.core.metadata.datatype.StructType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
@@ -111,7 +112,8 @@ public class TableSchemaBuilder {
     if (isSortColumn ||
         field.getDataType() == DataTypes.STRING ||
         field.getDataType() == DataTypes.DATE ||
-        field.getDataType() == DataTypes.TIMESTAMP) {
+        field.getDataType() == DataTypes.TIMESTAMP ||
+        DataTypes.isStructType(field.getDataType())) {
       newColumn.setDimensionColumn(true);
     } else {
       newColumn.setDimensionColumn(false);
@@ -128,6 +130,9 @@ public class TableSchemaBuilder {
     newColumn.setColumnUniqueId(field.getFieldName());
     newColumn.setColumnReferenceId(newColumn.getColumnUniqueId());
     newColumn.setEncodingList(createEncoding(field.getDataType(), isSortColumn));
+    if (field.getDataType().isComplexType()) {
+      newColumn.setNumberOfChild(((StructType) field.getDataType()).getFields().size());
+    }
     if (DataTypes.isDecimal(field.getDataType())) {
       DecimalType decimalType = (DecimalType) field.getDataType();
       newColumn.setPrecision(decimalType.getPrecision());
@@ -139,10 +144,18 @@ public class TableSchemaBuilder {
     } else {
       otherColumns.add(newColumn);
     }
-
     if (newColumn.isDimensionColumn()) {
       newColumn.setUseInvertedIndex(true);
     }
+    if (field.getDataType().isComplexType()) {
+      if (((StructType) field.getDataType()).getFields().size() > 0) {
+        // This field has children.
+        List<StructField> fields = ((StructType) field.getDataType()).getFields();
+        for (int i = 0; i < fields.size(); i ++) {
+          addColumn(fields.get(i), false);
+        }
+      }
+    }
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
index c888418..e19e329 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonColumn.java
@@ -52,6 +52,21 @@ public class CarbonColumn implements Serializable {
    */
   protected ColumnIdentifier columnIdentifier;
 
+  /**
+   * Date Format
+   */
+  private String dateFormat;
+
+  /**
+   * TimeStamp Format.
+   */
+  private String timestampFormat;
+
+  /**
+   * useActualData
+   */
+  private boolean useActualData;
+
   public CarbonColumn(ColumnSchema columnSchema, int ordinal, int schemaOrdinal) {
     this.columnSchema = columnSchema;
     this.ordinal = ordinal;
@@ -180,4 +195,28 @@ public class CarbonColumn implements Serializable {
   public int getSchemaOrdinal() {
     return this.schemaOrdinal;
   }
+
+  public String getDateFormat() {
+    return dateFormat;
+  }
+
+  public void setDateFormat(String dateFormat) {
+    this.dateFormat = dateFormat;
+  }
+
+  public String getTimestampFormat() {
+    return timestampFormat;
+  }
+
+  public void setTimestampFormat(String timestampFormat) {
+    this.timestampFormat = timestampFormat;
+  }
+
+  public boolean getUseActualData() {
+    return useActualData;
+  }
+
+  public void setUseActualData(boolean useActualData) {
+    this.useActualData = useActualData;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
index bb048aa..60f14a4 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedResultCollector.java
@@ -134,6 +134,10 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
           row[order[i]] =
               DataTypeUtil.getDataBasedOnDataType(scannedResult.getBlockletId(), DataTypes.STRING);
         }
+      } else if (complexDataTypeArray[i]) {
+        // Complex Type With No Dictionary Encoding.
+        row[order[i]] = comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
+            .getDataBasedOnDataType(ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
       } else {
         row[order[i]] = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(
             noDictionaryKeys[noDictionaryColumnIndex++],
@@ -146,7 +150,7 @@ public class DictionaryBasedResultCollector extends AbstractScannedResultCollect
       }
     } else if (complexDataTypeArray[i]) {
       row[order[i]] = comlexDimensionInfoMap.get(queryDimensions[i].getDimension().getOrdinal())
-          .getDataBasedOnDataTypeFromSurrogates(
+          .getDataBasedOnDataType(
               ByteBuffer.wrap(complexTypeKeyArray[complexTypeColumnIndex++]));
       dictionaryColumnIndex++;
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
index 24c1c9b..81e9651 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryType.java
@@ -85,14 +85,14 @@ public class ArrayQueryType extends ComplexQueryType implements GenericQueryType
     children.fillRequiredBlockData(blockChunkHolder);
   }
 
-  @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
-    int dataLength = surrogateData.getInt();
+  @Override public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
+    int dataLength = dataBuffer.getInt();
     if (dataLength == -1) {
       return null;
     }
     Object[] data = new Object[dataLength];
     for (int i = 0; i < dataLength; i++) {
-      data[i] = children.getDataBasedOnDataTypeFromSurrogates(surrogateData);
+      data[i] = children.getDataBasedOnDataType(dataBuffer);
     }
     return DataTypeUtil.getDataTypeConverter().wrapWithGenericArrayData(data);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
index 8c75caf..2db590b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.keygenerator.mdkey.Bits;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.filter.GenericQueryType;
 import org.apache.carbondata.core.scan.processor.RawBlockletColumnChunks;
 import org.apache.carbondata.core.util.DataTypeUtil;
@@ -43,8 +44,10 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
 
   private boolean isDirectDictionary;
 
+  private boolean isDictionary;
+
   public PrimitiveQueryType(String name, String parentname, int blockIndex,
-      org.apache.carbondata.core.metadata.datatype.DataType dataType, int keySize,
+      DataType dataType, int keySize,
       Dictionary dictionary, boolean isDirectDictionary) {
     super(name, parentname, blockIndex);
     this.dataType = dataType;
@@ -53,6 +56,7 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
     this.name = name;
     this.parentname = parentname;
     this.isDirectDictionary = isDirectDictionary;
+    this.isDictionary = (dictionary != null && isDirectDictionary == false);
   }
 
   @Override public void addChildren(GenericQueryType children) {
@@ -84,6 +88,9 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
       DimensionRawColumnChunk[] rawColumnChunks, int rowNumber,
       int pageNumber, DataOutputStream dataOutputStream) throws IOException {
     byte[] currentVal = copyBlockDataChunk(rawColumnChunks, rowNumber, pageNumber);
+    if (!this.isDictionary) {
+      dataOutputStream.writeInt(currentVal.length);
+    }
     dataOutputStream.write(currentVal);
   }
 
@@ -92,20 +99,35 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery
     readBlockDataChunk(blockChunkHolder);
   }
 
-  @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
-    byte[] data = new byte[keySize];
-    surrogateData.get(data);
-    Bits bit = new Bits(new int[]{keySize * 8});
-    int surrgateValue = (int)bit.getKeyArray(data, 0)[0];
+  @Override public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
     Object actualData = null;
+
     if (isDirectDictionary) {
-      DirectDictionaryGenerator directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
-          .getDirectDictionaryGenerator(dataType);
+      // Direct Dictionary Column
+      byte[] data = new byte[keySize];
+      dataBuffer.get(data);
+      Bits bit = new Bits(new int[] { keySize * 8 });
+      int surrgateValue = (int) bit.getKeyArray(data, 0)[0];
+      DirectDictionaryGenerator directDictionaryGenerator =
+          DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(dataType);
       actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue);
+    } else if (!isDictionary) {
+      // No Dictionary Columns
+      int size = dataBuffer.getInt();
+      byte[] value = new byte[size];
+      dataBuffer.get(value, 0, size);
+      actualData = DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(value, this.dataType);
     } else {
+      // Dictionary Column
+      byte[] data = new byte[keySize];
+      dataBuffer.get(data);
+      Bits bit = new Bits(new int[] { keySize * 8 });
+      int surrgateValue = (int) bit.getKeyArray(data, 0)[0];
       String dictionaryValueForKey = dictionary.getDictionaryValueForKey(surrgateValue);
       actualData = DataTypeUtil.getDataBasedOnDataType(dictionaryValueForKey, this.dataType);
     }
+
     return actualData;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
index 1064694..9ff8252 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/StructQueryType.java
@@ -101,13 +101,14 @@ public class StructQueryType extends ComplexQueryType implements GenericQueryTyp
     }
   }
 
-  @Override public Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData) {
-    int childLength = surrogateData.getInt();
+  @Override public Object getDataBasedOnDataType(ByteBuffer dataBuffer) {
+    int childLength = dataBuffer.getInt();
     Object[] fields = new Object[childLength];
     for (int i = 0; i < childLength; i++) {
-      fields[i] =  children.get(i).getDataBasedOnDataTypeFromSurrogates(surrogateData);
+      fields[i] =  children.get(i).getDataBasedOnDataType(dataBuffer);
     }
 
     return DataTypeUtil.getDataTypeConverter().wrapWithGenericRow(fields);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index 1765efa..cc31efc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -777,6 +777,10 @@ public class QueryUtil {
         boolean isDirectDictionary = CarbonUtil
             .hasEncoding(dimension.getListOfChildDimensions().get(i).getEncoder(),
                 Encoding.DIRECT_DICTIONARY);
+        boolean isDictionary = CarbonUtil
+            .hasEncoding(dimension.getListOfChildDimensions().get(i).getEncoder(),
+                Encoding.DICTIONARY);
+
         parentQueryType.addChildren(
             new PrimitiveQueryType(dimension.getListOfChildDimensions().get(i).getColName(),
                 dimension.getColName(), ++parentBlockIndex,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
index b5d8d82..fe65669 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/GenericQueryType.java
@@ -43,5 +43,6 @@ public interface GenericQueryType {
 
   void fillRequiredBlockData(RawBlockletColumnChunks blockChunkHolder) throws IOException;
 
-  Object getDataBasedOnDataTypeFromSurrogates(ByteBuffer surrogateData);
+  Object getDataBasedOnDataType(ByteBuffer dataBuffer);
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index 2bd49ed..7e92aef 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -440,7 +440,7 @@ public class RowLevelFilterExecuterImpl implements FilterExecuter {
           complexType.parseBlocksAndReturnComplexColumnByteArray(
               blockChunkHolder.getDimensionRawColumnChunks(), index, pageIndex, dataOutputStream);
           record[dimColumnEvaluatorInfo.getRowIndex()] = complexType
-              .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
+              .getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray()));
           byteStream.close();
         } catch (IOException e) {
           LOGGER.info(e.getMessage());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
index 35d4f51..b85945f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/result/BlockletScannedResult.java
@@ -289,7 +289,7 @@ public abstract class BlockletScannedResult {
               pageFilteredRowId == null ? j : pageFilteredRowId[pageCounter][j], pageCounter,
               dataOutput);
           Object data = vectorInfos[i].genericQueryType
-              .getDataBasedOnDataTypeFromSurrogates(ByteBuffer.wrap(byteStream.toByteArray()));
+              .getDataBasedOnDataType(ByteBuffer.wrap(byteStream.toByteArray()));
           vector.putObject(vectorOffset++, data);
         } catch (IOException e) {
           LOGGER.error(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index a5351a0..b42167d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2348,6 +2348,7 @@ public final class CarbonUtil {
     try {
       fistFilePath = filePaths.get(0);
     } catch (Exception e) {
+      // Check if we can infer the schema from the hive metastore.
       LOGGER.error("CarbonData file is not present in the table location");
       throw new IOException("CarbonData file is not present in the table location");
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/test/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryTypeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryTypeTest.java
index e882f4e..3c2c374 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryTypeTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/ArrayQueryTypeTest.java
@@ -41,7 +41,7 @@ public class ArrayQueryTypeTest {
     surrogateData.put(1, (byte) 0xFF);
     surrogateData.put(2, (byte) 0xFF);
     surrogateData.put(3, (byte) 0xFF);
-    assertNull(arrayQueryType.getDataBasedOnDataTypeFromSurrogates(surrogateData));
+    assertNull(arrayQueryType.getDataBasedOnDataType(surrogateData));
   }
 
   @Test public void testGetDataBasedOnDataTypeFromSurrogates() {
@@ -50,6 +50,6 @@ public class ArrayQueryTypeTest {
     arrayQueryType.setName("testName");
     arrayQueryType.setParentname("testName");
     arrayQueryType.addChildren(arrayQueryType);
-    assertNotNull(arrayQueryType.getDataBasedOnDataTypeFromSurrogates(surrogateData));
+    assertNotNull(arrayQueryType.getDataBasedOnDataType(surrogateData));
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
index 3236f16..757f2b9 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryTypeTest.java
@@ -84,7 +84,7 @@ public class PrimitiveQueryTypeTest {
     Object expectedValue = 1313043000000L;
 
     Object actualValue =
-        primitiveQueryTypeForTimeStamp.getDataBasedOnDataTypeFromSurrogates(surrogateData);
+        primitiveQueryTypeForTimeStamp.getDataBasedOnDataType(surrogateData);
     assertEquals(expectedValue, actualValue);
   }
 
@@ -97,9 +97,9 @@ public class PrimitiveQueryTypeTest {
       }
     };
     Object expectedValue = primitiveQueryTypeForTimeStampForIsDictionaryFalse
-        .getDataBasedOnDataTypeFromSurrogates(surrogateData);
+        .getDataBasedOnDataType(surrogateData);
     Object actualValue = primitiveQueryTypeForTimeStampForIsDictionaryFalse
-        .getDataBasedOnDataTypeFromSurrogates(surrogateData);
+        .getDataBasedOnDataType(surrogateData);
     assertEquals(expectedValue, actualValue);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
index b09d9dd..18b85c6 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/complextypes/StructQueryTypeTest.java
@@ -47,7 +47,7 @@ public class StructQueryTypeTest {
     structQueryType.addChildren(arrayQueryType);
     List children = new ArrayList();
     children.add(arrayQueryType);
-    assertNotNull(structQueryType.getDataBasedOnDataTypeFromSurrogates(surrogateData));
+    assertNotNull(structQueryType.getDataBasedOnDataType(surrogateData));
   }
 
   @Test public void testGetColsCount() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml
index d70fa2e..9f184e6 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -136,6 +136,12 @@
       <artifactId>jmockit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>tech.allegro.schema.json2avro</groupId>
+      <artifactId>converter</artifactId>
+      <version>0.2.5</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
index f1bda31..a6af4a6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -17,13 +17,13 @@
 
 package org.apache.carbondata.spark.testsuite.createTable
 
-import java.io.{File, FileFilter}
+import java.io.{File, FileFilter, IOException}
 import java.util
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import org.junit.Assert
+import org.junit.{Assert, Test}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -31,9 +31,14 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
-
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
 import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.commons.lang.CharEncoding
+import tech.allegro.schema.json2avro.converter.JsonAvroConverter
+
+import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
 
 class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
 
@@ -637,7 +642,6 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
     cleanTestData()
   }
 
-
   test("test huge data write with one batch having bad record") {
 
     val exception =
@@ -648,4 +652,88 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
       .contains("Data load failed due to bad record"))
 
   }
-}
+
+
+  def buildAvroTestData(rows: Int, options: util.Map[String, String]): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    val newAvroSchema = "{" + " \"type\" : \"record\", " + "  \"name\" : \"userInfo\", " +
+                        "  \"namespace\" : \"my.example\", " +
+                        "  \"fields\" : [{\"name\" : \"username\", " +
+                        "  \"type\" : \"string\", " + "  \"default\" : \"NONE\"}, " +
+                        " {\"name\" : \"age\", " + " \"type\" : \"int\", " +
+                        " \"default\" : -1}, " + "{\"name\" : \"address\", " +
+                        "   \"type\" : { " + "  \"type\" : \"record\", " +
+                        "   \"name\" : \"mailing_address\", " + "  \"fields\" : [ {" +
+                        "        \"name\" : \"street\", " +
+                        "       \"type\" : \"string\", " +
+                        "       \"default\" : \"NONE\"}, { " + " \"name\" : \"city\", " +
+                        "  \"type\" : \"string\", " + "  \"default\" : \"NONE\"}, " +
+                        "                 ]}, " + " \"default\" : {} " + " } " + "}"
+    val mySchema = "{" + "  \"name\": \"address\", " + "   \"type\": \"record\", " +
+                   "    \"fields\": [  " +
+                   "  { \"name\": \"name\", \"type\": \"string\"}, " +
+                   "  { \"name\": \"age\", \"type\": \"int\"}, " + "  { " +
+                   "    \"name\": \"address\", " + "      \"type\": { " +
+                   "    \"type\" : \"record\", " + "        \"name\" : \"my_address\", " +
+                   "        \"fields\" : [ " +
+                   "    {\"name\": \"street\", \"type\": \"string\"}, " +
+                   "    {\"name\": \"city\", \"type\": \"string\"} " + "  ]} " + "  } " +
+                   "] " + "}"
+    val json = "{\"name\":\"bob\", \"age\":10, \"address\" : {\"street\":\"abc\", " +
+               "\"city\":\"bang\"}}"
+    // conversion to GenericData.Record
+    val nn = new org.apache.avro.Schema.Parser().parse(mySchema)
+    val converter = new JsonAvroConverter
+    val record = converter
+      .convertToGenericDataRecord(json.getBytes(CharEncoding.UTF_8), nn)
+    val fields = new Array[Field](3)
+    fields(0) = new Field("name", DataTypes.STRING)
+    fields(1) = new Field("age", DataTypes.STRING)
+    // fields[1] = new Field("age", DataTypes.INT);
+    val fld = new util.ArrayList[StructField]
+    fld.add(new StructField("street", DataTypes.STRING))
+    fld.add(new StructField("city", DataTypes.STRING))
+    fields(2) = new Field("address", "struct", fld)
+    try {
+      val writer = CarbonWriter.builder.withSchema(new Schema(fields))
+        .outputPath(writerPath).isTransactionalTable(false).buildWriterForAvroInput
+      var i = 0
+      while (i < rows) {
+        writer.write(record)
+        i = i + 1
+      }
+      writer.close()
+    }
+    catch {
+      case e: Exception => {
+        e.printStackTrace()
+        Assert.fail(e.getMessage)
+      }
+    }
+  }
+
+  def buildAvroTestDataSingleFile(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildAvroTestData(3, null)
+  }
+
+  test("Read sdk writer Avro output ") {
+    buildAvroTestDataSingleFile()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    sql("select * from sdkOutputTable").show(false)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(
+      Row("bob", "10", Row("abc","bang")),
+      Row("bob", "10", Row("abc","bang")),
+      Row("bob", "10", Row("abc","bang"))))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index c61471a..2d24abf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -42,8 +42,9 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable}
+import org.apache.carbondata.core.metadata.schema.table
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.SchemaReader
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.ThriftWriter
@@ -223,9 +224,26 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val tablePath = identifier.getTablePath
     val wrapperTableInfo =
     if (inferSchema) {
-      val thriftTableInfo = schemaConverter
-        .fromWrapperToExternalTableInfo(SchemaReader.inferSchema(identifier, false),
-          dbName, tableName)
+      val carbonTbl = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName)
+      val tblInfoFromCache = if (carbonTbl != null) {
+        carbonTbl.getTableInfo
+      } else {
+        null
+      }
+
+      val thriftTableInfo : TableInfo = if (tblInfoFromCache != null) {
+        // In case the TableInfo is present in the Carbon Metadata Cache
+        // then get the tableinfo from the cache rather than infering from
+        // the CarbonData file.
+        schemaConverter
+          .fromWrapperToExternalTableInfo(tblInfoFromCache, dbName, tableName)
+      } else {
+        schemaConverter
+          .fromWrapperToExternalTableInfo(SchemaReader
+                      .inferSchema(identifier, false),
+            dbName, tableName)
+      }
+
       val wrapperTableInfo =
         schemaConverter
           .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
index 65210b8..fe11b98 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/SparkTypeConverter.scala
@@ -97,16 +97,32 @@ private[spark] object SparkTypeConverter {
   def getStructChildren(table: CarbonTable, dimName: String): String = {
     table.getChildren(dimName).asScala.map(childDim => {
       childDim.getDataType.getName.toLowerCase match {
-        case "array" => s"${
+        case "array" => if (table.isTransactionalTable) {s"${
           childDim.getColName.substring(dimName.length + 1)
-        }:array<${ getArrayChildren(table, childDim.getColName) }>"
-        case "struct" => s"${
+        }:array<${ getArrayChildren(table, childDim.getColName) }>" } else {
+          // For non Transactional table the Childrends of Struct Columns
+          // are not appended with their parent.
+          s"${
+            childDim.getColName
+          }:array<${ getArrayChildren(table, childDim.getColName) }>"
+        }
+        case "struct" => if (table.isTransactionalTable) { s"${
           childDim.getColName.substring(dimName.length + 1)
         }:struct<${ table.getChildren(childDim.getColName)
           .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
-        }>"
-        case dType => s"${ childDim.getColName
+        }>"} else {
+          s"${
+            childDim.getColName
+          }:struct<${ table.getChildren(childDim.getColName)
+            .asScala.map(f => s"${ recursiveMethod(table, childDim.getColName, f) }").mkString(",")
+          }>"
+        }
+        case dType => if (table.isTransactionalTable) {
+          s"${ childDim.getColName
           .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
+        } else {
+          s"${ childDim.getColName}:${ addDecimalScaleAndPrecision(childDim, dType) }"
+        }
       }
     }).mkString(",")
   }
@@ -123,13 +139,31 @@ private[spark] object SparkTypeConverter {
   private def recursiveMethod(
       table: CarbonTable, dimName: String, childDim: CarbonDimension) = {
     childDim.getDataType.getName.toLowerCase match {
-      case "array" => s"${
-        childDim.getColName.substring(dimName.length + 1)
-      }:array<${ getArrayChildren(table, childDim.getColName) }>"
-      case "struct" => s"${
-        childDim.getColName.substring(dimName.length + 1)
-      }:struct<${ getStructChildren(table, childDim.getColName) }>"
-      case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
+      case "array" => if (table.isTransactionalTable) {
+        s"${
+          childDim.getColName.substring(dimName.length + 1)
+        }:array<${ getArrayChildren(table, childDim.getColName) }>"
+      } else {
+        // For non Transactional table the Childrends of Struct Columns
+        // are not appended with their parent.
+        s"${
+          childDim.getColName
+        }:array<${ getArrayChildren(table, childDim.getColName) }>"
+      }
+      case "struct" => if (table.isTransactionalTable) {
+        s"${
+          childDim.getColName.substring(dimName.length + 1)
+        }:struct<${ getStructChildren(table, childDim.getColName) }>"
+      } else {
+        s"${
+          childDim.getColName
+        }:struct<${ getStructChildren(table, childDim.getColName) }>"
+      }
+      case dType => if (table.isTransactionalTable) {
+        s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
+      } else {
+        s"${ childDim.getColName }:${ dType }"
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 86a6744..fb198ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -147,6 +147,10 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
 
   }
 
+  @Override public boolean getIsColumnDictionary() {
+    return true;
+  }
+
   @Override
   public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream)
       throws IOException, DictionaryGenerationException {
@@ -168,22 +172,21 @@ public class ArrayDataType implements GenericDataType<ArrayObject> {
     children.fillCardinality(dimCardWithComplex);
   }
 
-  /**
-   * parse byte array and bit pack
-   */
   @Override
-  public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
-      KeyGenerator[] generator) throws IOException, KeyGenException {
+  public void parseComplexValue(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+      KeyGenerator[] generator)
+      throws IOException, KeyGenException {
     int dataLength = byteArrayInput.getInt();
 
     dataOutputStream.writeInt(dataLength);
     if (children instanceof PrimitiveDataType) {
-      dataOutputStream.writeInt(generator[children.getSurrogateIndex()].getKeySizeInBytes());
+      if (children.getIsColumnDictionary()) {
+        dataOutputStream.writeInt(generator[children.getSurrogateIndex()].getKeySizeInBytes());
+      }
     }
     for (int i = 0; i < dataLength; i++) {
-      children.parseAndBitPack(byteArrayInput, dataOutputStream, generator);
+      children.parseComplexValue(byteArrayInput, dataOutputStream, generator);
     }
-
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 77c00d9..21ad83d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -72,15 +72,23 @@ public interface GenericDataType<T> {
   void setSurrogateIndex(int surrIndex);
 
   /**
-   * converts integer surrogate to bit packed surrogate value
+   * Returns true in case the column has Dictionary Encoding.
+   * @return
+   */
+  boolean getIsColumnDictionary();
+
+  /**
+   * Parse the Complex Datatype from the ByteBuffer.
    * @param byteArrayInput
    * @param dataOutputStream
    * @param generator
+   * @return
    * @throws IOException
    * @throws KeyGenException
    */
-  void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
-      KeyGenerator[] generator) throws IOException, KeyGenException;
+  void parseComplexValue(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+      KeyGenerator[] generator)
+      throws IOException, KeyGenException;
 
   /**
    * @return columns count of each complex type

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 55fa23b..fa60bf6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.processing.datatypes;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -39,13 +40,16 @@ import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
 import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
 import org.apache.carbondata.processing.loading.dictionary.PreCreatedDictionary;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
 
 /**
  * Primitive DataType stateless object used in data loading
@@ -94,6 +98,13 @@ public class PrimitiveDataType implements GenericDataType<Object> {
 
   private CarbonDimension carbonDimension;
 
+  private boolean isDictionary;
+
+  private boolean isEmptyBadRecord;
+
+  private String nullformat;
+
+
   private PrimitiveDataType(int outputArrayIndex, int dataCounter) {
     this.outputArrayIndex = outputArrayIndex;
     this.dataCounter = dataCounter;
@@ -105,28 +116,45 @@ public class PrimitiveDataType implements GenericDataType<Object> {
    * @param name
    * @param parentname
    * @param columnId
+   * @param dimensionOrdinal
+   * @param isDictionary
    */
-  public PrimitiveDataType(String name, String parentname, String columnId, int dimensionOrdinal) {
+  public PrimitiveDataType(String name, String parentname, String columnId, int dimensionOrdinal,
+      boolean isDictionary, String nullformat, boolean isEmptyBadRecord) {
     this.name = name;
     this.parentname = parentname;
     this.columnId = columnId;
+    this.isDictionary = isDictionary;
+    this.nullformat = nullformat;
+    this.isEmptyBadRecord = isEmptyBadRecord;
   }
 
   /**
-   * constructor
-   *
-   * @param name
+   * Constructor
+   * @param carbonColumn
    * @param parentname
    * @param columnId
+   * @param carbonDimension
+   * @param cache
+   * @param absoluteTableIdentifier
+   * @param client
+   * @param useOnePass
+   * @param localCache
+   * @param nullFormat
+   * @param isEmptyBadRecords
    */
-  public PrimitiveDataType(String name, String parentname, String columnId,
+  public PrimitiveDataType(CarbonColumn carbonColumn, String parentname, String columnId,
       CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
       AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
-      Map<Object, Integer> localCache) {
-    this.name = name;
+      Map<Object, Integer> localCache, String nullFormat, Boolean isEmptyBadRecords) {
+    this.name = carbonColumn.getColName();
     this.parentname = parentname;
     this.columnId = columnId;
     this.carbonDimension = carbonDimension;
+    this.isDictionary = isDictionaryDimension(carbonDimension);
+    this.nullformat = nullFormat;
+    this.isEmptyBadRecord = isEmptyBadRecords;
+
     DictionaryColumnUniqueIdentifier identifier =
         new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
             carbonDimension.getColumnIdentifier(), carbonDimension.getDataType());
@@ -134,7 +162,7 @@ public class PrimitiveDataType implements GenericDataType<Object> {
       if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory
             .getDirectDictionaryGenerator(carbonDimension.getDataType()));
-      } else {
+      } else if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
         Dictionary dictionary = null;
         if (useOnePass) {
           if (CarbonUtil.isFileExistsForGivenColumn(identifier)) {
@@ -160,6 +188,14 @@ public class PrimitiveDataType implements GenericDataType<Object> {
     }
   }
 
+  private boolean isDictionaryDimension(CarbonDimension carbonDimension) {
+    if (carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   /*
    * primitive column will not have any child column
    */
@@ -211,38 +247,123 @@ public class PrimitiveDataType implements GenericDataType<Object> {
   /*
    * set surrogate index
    */
-  @Override
-  public void setSurrogateIndex(int surrIndex) {
-    index = surrIndex;
+  @Override public void setSurrogateIndex(int surrIndex) {
+    if (this.carbonDimension != null && !this.carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+      index = 0;
+    } else if (this.carbonDimension == null && isDictionary == false) {
+      index = 0;
+    } else {
+      index = surrIndex;
+    }
+  }
+
+  @Override public boolean getIsColumnDictionary() {
+    return isDictionary;
   }
 
   @Override public void writeByteArray(Object input, DataOutputStream dataOutputStream)
       throws IOException, DictionaryGenerationException {
+
     String parsedValue =
         input == null ? null : DataTypeUtil.parseValue(input.toString(), carbonDimension);
-    Integer surrogateKey;
-    if (null == parsedValue) {
-      surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
-    } else {
-      surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
-      if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+    if (this.isDictionary) {
+      Integer surrogateKey;
+      if (null == parsedValue) {
         surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+      } else {
+        surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+        if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+          surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+        }
+      }
+      dataOutputStream.writeInt(surrogateKey);
+    } else {
+      // Transform into ByteArray for No Dictionary.
+      // TODO have to refactor and place all the cases present in NonDictionaryFieldConverterImpl
+      if (null == parsedValue && this.carbonDimension.getDataType() != DataTypes.STRING) {
+        updateNullValue(dataOutputStream);
+      } else if (null == parsedValue || parsedValue.equals(nullformat)) {
+        updateNullValue(dataOutputStream);
+      } else {
+        String dateFormat = null;
+        if (this.carbonDimension.getDataType() == DataTypes.DATE) {
+          dateFormat = this.carbonDimension.getDateFormat();
+        } else if (this.carbonDimension.getDataType() == DataTypes.TIMESTAMP) {
+          dateFormat = this.carbonDimension.getTimestampFormat();
+        }
+
+        try {
+          if (!this.carbonDimension.getUseActualData()) {
+            byte[] value = DataTypeUtil.getBytesBasedOnDataTypeForNoDictionaryColumn(parsedValue,
+                this.carbonDimension.getDataType(), dateFormat);
+            if (this.carbonDimension.getDataType() == DataTypes.STRING
+                && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
+              throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
+                  + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+            }
+            updateValueToByteStream(dataOutputStream, value);
+          } else {
+            Object value = DataTypeUtil.getDataDataTypeForNoDictionaryColumn(parsedValue,
+                this.carbonDimension.getDataType(), dateFormat);
+            if (this.carbonDimension.getDataType() == DataTypes.STRING
+                && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
+              throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
+                  + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+            }
+            if (parsedValue.length() > 0) {
+              updateValueToByteStream(dataOutputStream,
+                  parsedValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+            } else {
+              updateNullValue(dataOutputStream);
+            }
+          }
+        } catch (CarbonDataLoadingException e) {
+          throw e;
+        } catch (Throwable ex) {
+          // TODO have to implemented the Bad Records LogHolder.
+          // Same like NonDictionaryFieldConverterImpl.
+          throw ex;
+        }
       }
     }
-    dataOutputStream.writeInt(surrogateKey);
   }
 
-  @Override
-  public void fillCardinality(List<Integer> dimCardWithComplex) {
+  private void updateValueToByteStream(DataOutputStream dataOutputStream, byte[] value)
+      throws IOException {
+    dataOutputStream.writeInt(value.length);
+    dataOutputStream.write(value);
+  }
+
+  private void updateNullValue(DataOutputStream dataOutputStream) throws IOException {
+    if (this.carbonDimension.getDataType() == DataTypes.STRING) {
+      dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY);
+    } else {
+      dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY);
+    }
+  }
+
+  @Override public void fillCardinality(List<Integer> dimCardWithComplex) {
+    if (!this.carbonDimension.hasEncoding(Encoding.DICTIONARY)) {
+      return;
+    }
     dimCardWithComplex.add(dictionaryGenerator.size());
   }
 
   @Override
-  public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
-      KeyGenerator[] generator) throws IOException, KeyGenException {
-    int data = byteArrayInput.getInt();
-    byte[] v = generator[index].generateKey(new int[] { data });
-    dataOutputStream.write(v);
+  public void parseComplexValue(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+      KeyGenerator[] generator)
+      throws IOException, KeyGenException {
+    if (!this.isDictionary) {
+      int sizeOfData = byteArrayInput.getInt();
+      dataOutputStream.writeInt(sizeOfData);
+      byte[] bb = new byte[sizeOfData];
+      byteArrayInput.get(bb, 0, sizeOfData);
+      dataOutputStream.write(bb);
+    } else {
+      int data = byteArrayInput.getInt();
+      byte[] v = generator[index].generateKey(new int[] { data });
+      dataOutputStream.write(v);
+    }
   }
 
   /*
@@ -326,8 +447,16 @@ public class PrimitiveDataType implements GenericDataType<Object> {
   @Override
   public GenericDataType<Object> deepCopy() {
     PrimitiveDataType dataType = new PrimitiveDataType(this.outputArrayIndex, 0);
+    dataType.carbonDimension = this.carbonDimension;
+    dataType.isDictionary = this.isDictionary;
+    dataType.parentname = this.parentname;
+    dataType.columnId = this.columnId;
+    dataType.dictionaryGenerator = this.dictionaryGenerator;
+    dataType.isEmptyBadRecord = this.isEmptyBadRecord;
+    dataType.nullformat = this.nullformat;
     dataType.setKeySize(this.keySize);
     dataType.setSurrogateIndex(this.index);
+
     return dataType;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index a340ab1..36899a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -146,6 +146,10 @@ public class StructDataType implements GenericDataType<StructObject> {
 
   }
 
+  @Override public boolean getIsColumnDictionary() {
+    return true;
+  }
+
   @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream)
       throws IOException, DictionaryGenerationException {
     dataOutputStream.writeInt(children.size());
@@ -175,20 +179,29 @@ public class StructDataType implements GenericDataType<StructObject> {
     }
   }
 
-  /*
-   * parse bytearray and bit pack
+  /**
+   *
+   * @param byteArrayInput
+   * @param dataOutputStream
+   * @param generator
+   * @return
+   * @throws IOException
+   * @throws KeyGenException
    */
-  @Override
-  public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
-      KeyGenerator[] generator) throws IOException, KeyGenException {
+  @Override public void parseComplexValue(ByteBuffer byteArrayInput,
+      DataOutputStream dataOutputStream, KeyGenerator[] generator)
+      throws IOException, KeyGenException {
     int childElement = byteArrayInput.getInt();
     dataOutputStream.writeInt(childElement);
+
     for (int i = 0; i < childElement; i++) {
       if (children.get(i) instanceof PrimitiveDataType) {
-        dataOutputStream.writeInt(generator[children.get(i).getSurrogateIndex()]
-            .getKeySizeInBytes());
+        if (children.get(i).getIsColumnDictionary()) {
+          dataOutputStream
+              .writeInt(generator[children.get(i).getSurrogateIndex()].getKeySizeInBytes());
+        }
       }
-      children.get(i).parseAndBitPack(byteArrayInput, dataOutputStream, generator);
+      children.get(i).parseComplexValue(byteArrayInput, dataOutputStream, generator);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index ad1c84c..9418efb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -87,7 +87,9 @@ public class CarbonDataLoadConfiguration {
 
   private int noDictionaryCount;
 
-  private int complexColumnCount;
+  private int complexDictionaryColumnCount;
+
+  private int complexNonDictionaryColumnCount;
 
   /**
    * schema updated time stamp to be used for restructure scenarios
@@ -128,13 +130,17 @@ public class CarbonDataLoadConfiguration {
       CarbonColumn column = dataField.getColumn();
       if (column.isDimension()) {
         dimensionCount++;
-        if (!dataField.hasDictionaryEncoding()) {
+        if (column.isComplex()) {
+          if (!dataField.hasDictionaryEncoding()) {
+            complexNonDictionaryColumnCount++;
+          } else {
+            complexDictionaryColumnCount++;
+          }
+        } else if (!dataField.hasDictionaryEncoding()) {
           noDictionaryCount++;
         }
       }
-      if (column.isComplex()) {
-        complexColumnCount++;
-      }
+
       if (column.isMeasure()) {
         measureCount++;
       }
@@ -153,8 +159,8 @@ public class CarbonDataLoadConfiguration {
     return noDictionaryCount;
   }
 
-  public int getComplexColumnCount() {
-    return complexColumnCount;
+  public int getComplexDictionaryColumnCount() {
+    return complexDictionaryColumnCount;
   }
 
   public int getMeasureCount() {
@@ -387,4 +393,9 @@ public class CarbonDataLoadConfiguration {
   public void setCarbonTransactionalTable(boolean carbonTransactionalTable) {
     this.carbonTransactionalTable = carbonTransactionalTable;
   }
+
+  public int getComplexNonDictionaryColumnCount() {
+    return complexNonDictionaryColumnCount;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
index dc2fbbb..4018f75 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
@@ -69,5 +69,6 @@ public class DataField implements Serializable {
 
   public void setUseActualData(boolean useActualData) {
     this.useActualData = useActualData;
+    this.column.setUseActualData(useActualData);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 9c1d113..028c404 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -230,8 +230,10 @@ public final class DataLoadProcessBuilder {
       DataField dataField = new DataField(column);
       if (column.getDataType() == DataTypes.DATE) {
         dataField.setDateFormat(loadModel.getDateFormat());
+        column.setDateFormat(loadModel.getDateFormat());
       } else if (column.getDataType() == DataTypes.TIMESTAMP) {
         dataField.setTimestampFormat(loadModel.getTimestampformat());
+        column.setTimestampFormat(loadModel.getTimestampformat());
       }
       if (column.isComplex()) {
         complexDataFields.add(dataField);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 193d192..567a8b5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -115,8 +115,8 @@ public class FieldEncoderFactory {
         }
       } else if (dataField.getColumn().isComplex()) {
         return new ComplexFieldConverterImpl(
-            createComplexType(dataField, cache, absoluteTableIdentifier,
-                client, useOnePass, localCache), index);
+            createComplexDataType(dataField, cache, absoluteTableIdentifier,
+                client, useOnePass, localCache, index, nullFormat, isEmptyBadRecord), index);
       } else {
         return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
       }
@@ -128,12 +128,13 @@ public class FieldEncoderFactory {
   /**
    * Create parser for the carbon column.
    */
-  private static GenericDataType createComplexType(DataField dataField,
+  private static GenericDataType createComplexDataType(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
       AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
-      Map<Object, Integer> localCache) {
+      Map<Object, Integer> localCache, int index, String nullFormat, Boolean isEmptyBadRecords) {
     return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
-        absoluteTableIdentifier, client, useOnePass, localCache);
+        absoluteTableIdentifier, client, useOnePass, localCache, index, nullFormat,
+        isEmptyBadRecords);
   }
 
   /**
@@ -141,10 +142,11 @@ public class FieldEncoderFactory {
    *
    * @return GenericDataType
    */
+
   private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
       AbsoluteTableIdentifier absoluteTableIdentifier, DictionaryClient client, Boolean useOnePass,
-      Map<Object, Integer> localCache) {
+      Map<Object, Integer> localCache, int index, String nullFormat, Boolean isEmptyBadRecords) {
     DataType dataType = carbonColumn.getDataType();
     if (DataTypes.isArrayType(dataType)) {
       List<CarbonDimension> listOfChildDimensions =
@@ -155,7 +157,7 @@ public class FieldEncoderFactory {
       for (CarbonDimension dimension : listOfChildDimensions) {
         arrayDataType.addChildren(
             createComplexType(dimension, carbonColumn.getColName(), cache, absoluteTableIdentifier,
-                client, useOnePass, localCache));
+                client, useOnePass, localCache, index, nullFormat, isEmptyBadRecords));
       }
       return arrayDataType;
     } else if (DataTypes.isStructType(dataType)) {
@@ -167,15 +169,16 @@ public class FieldEncoderFactory {
       for (CarbonDimension dimension : dimensions) {
         structDataType.addChildren(
             createComplexType(dimension, carbonColumn.getColName(), cache, absoluteTableIdentifier,
-                client, useOnePass, localCache));
+                client, useOnePass, localCache, index, nullFormat, isEmptyBadRecords));
       }
       return structDataType;
     } else if (DataTypes.isMapType(dataType)) {
       throw new UnsupportedOperationException("Complex type Map is not supported yet");
     } else {
-      return new PrimitiveDataType(carbonColumn.getColName(), parentName,
-          carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
-          absoluteTableIdentifier, client, useOnePass, localCache);
+      return new PrimitiveDataType(carbonColumn, parentName, carbonColumn.getColumnId(),
+          (CarbonDimension) carbonColumn, cache, absoluteTableIdentifier, client, useOnePass,
+          localCache, nullFormat, isEmptyBadRecords);
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 6cf1dcd..ea75cd2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -110,7 +110,8 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       writeCounter = new long[iterators.length];
       dimensionWithComplexCount = configuration.getDimensionCount();
       noDictWithComplextCount =
-          configuration.getNoDictionaryCount() + configuration.getComplexColumnCount();
+          configuration.getNoDictionaryCount() + configuration.getComplexDictionaryColumnCount()
+              + configuration.getComplexNonDictionaryColumnCount();
       dimensionCount = configuration.getDimensionCount() - noDictWithComplextCount;
       isNoDictionaryDimensionColumn =
           CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 4d333ed..705350c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -364,9 +364,11 @@ public class SortParameters implements Serializable {
     parameters.setTaskNo(configuration.getTaskNo());
     parameters.setMeasureColCount(configuration.getMeasureCount());
     parameters.setDimColCount(
-        configuration.getDimensionCount() - configuration.getComplexColumnCount());
+        configuration.getDimensionCount() - (configuration.getComplexDictionaryColumnCount()
+            + configuration.getComplexNonDictionaryColumnCount()));
     parameters.setNoDictionaryCount(configuration.getNoDictionaryCount());
-    parameters.setComplexDimColCount(configuration.getComplexColumnCount());
+    parameters.setComplexDimColCount(configuration.getComplexDictionaryColumnCount() + configuration
+        .getComplexNonDictionaryColumnCount());
     parameters.setNoDictionaryDimnesionColumn(
         CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
     parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index aaf20c7..2ec85fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -199,7 +199,8 @@ public class CarbonFactDataHandlerModel {
 
     int dimensionCount = configuration.getDimensionCount();
     int noDictionaryCount = configuration.getNoDictionaryCount();
-    int complexDimensionCount = configuration.getComplexColumnCount();
+    int complexDimensionCount = configuration.getComplexDictionaryColumnCount() + configuration
+        .getComplexNonDictionaryColumnCount();
     int measureCount = configuration.getMeasureCount();
 
     int simpleDimsCount = dimensionCount - noDictionaryCount - complexDimensionCount;
@@ -210,8 +211,8 @@ public class CarbonFactDataHandlerModel {
     //To Set MDKey Index of each primitive type in complex type
     int surrIndex = simpleDimsCount;
     Iterator<Map.Entry<String, GenericDataType>> complexMap =
-        CarbonDataProcessorUtil.getComplexTypesMap(configuration.getDataFields()).entrySet()
-            .iterator();
+        CarbonDataProcessorUtil.getComplexTypesMap(configuration.getDataFields(), configuration)
+            .entrySet().iterator();
     Map<Integer, GenericDataType> complexIndexMap = new HashMap<>(complexDimensionCount);
     while (complexMap.hasNext()) {
       Map.Entry<String, GenericDataType> complexDataType = complexMap.next();
@@ -609,5 +610,6 @@ public class CarbonFactDataHandlerModel {
   public DataMapWriterListener getDataMapWriterlistener() {
     return dataMapWriterlistener;
   }
+
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index f22d1c1..26a634b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -191,7 +191,7 @@ public class TablePage {
   }
 
   /**
-   * add a complex column into internal member compleDimensionPage
+   * add a complex column into internal member complexDimensionPage
    *
    * @param index          index of the complexDimensionPage
    * @param rowId          Id of the input row
@@ -222,7 +222,7 @@ public class TablePage {
       ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
       ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
       DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput);
-      complexDataType.parseAndBitPack(byteArrayInput, dataOutputStream,
+      complexDataType.parseComplexValue(byteArrayInput, dataOutputStream,
           model.getComplexDimensionKeyGenerator());
       complexDataType.getColumnarDataForComplexType(encodedComplexColumnar,
           ByteBuffer.wrap(byteArrayOutput.toByteArray()));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 392ad59..6ba05a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -267,14 +267,24 @@ public final class CarbonDataProcessorUtil {
     return dimString.toString();
   }
 
+  private static String isDictionaryType(CarbonDimension dimension) {
+    Boolean isDictionary = true;
+    if (!(dimension.hasEncoding(Encoding.DICTIONARY))) {
+      isDictionary = false;
+    }
+    return isDictionary.toString();
+  }
+
   /**
    * This method will return all the child dimensions under complex dimension
    */
   private static void addAllComplexTypeChildren(CarbonDimension dimension, StringBuilder dimString,
       String parent) {
+
     dimString.append(dimension.getColName()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
         .append(dimension.getDataType()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
         .append(parent).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
+        .append(isDictionaryType(dimension)).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
         .append(dimension.getColumnId()).append(CarbonCommonConstants.HASH_SPC_CHARACTER);
     for (int i = 0; i < dimension.getNumberOfChild(); i++) {
       CarbonDimension childDim = dimension.getListOfChildDimensions().get(i);
@@ -284,6 +294,7 @@ public final class CarbonDataProcessorUtil {
         dimString.append(childDim.getColName()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
             .append(childDim.getDataType()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
             .append(dimension.getColName()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
+            .append(isDictionaryType(dimension)).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
             .append(childDim.getColumnId()).append(CarbonCommonConstants.COLON_SPC_CHARACTER)
             .append(childDim.getOrdinal()).append(CarbonCommonConstants.HASH_SPC_CHARACTER);
       }
@@ -291,11 +302,21 @@ public final class CarbonDataProcessorUtil {
   }
 
   // TODO: need to simplify it. Not required create string first.
-  public static Map<String, GenericDataType> getComplexTypesMap(DataField[] dataFields) {
+  public static Map<String, GenericDataType> getComplexTypesMap(DataField[] dataFields,
+      CarbonDataLoadConfiguration configuration) {
     String complexTypeString = getComplexTypeString(dataFields);
+
     if (null == complexTypeString || complexTypeString.equals("")) {
       return new LinkedHashMap<>();
     }
+
+    String nullFormat =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+            .toString();
+    boolean isEmptyBadRecord = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+            .toString());
+
     Map<String, GenericDataType> complexTypesMap = new LinkedHashMap<String, GenericDataType>();
     String[] hierarchies = complexTypeString.split(CarbonCommonConstants.SEMICOLON_SPC_CHARACTER);
     for (int i = 0; i < hierarchies.length; i++) {
@@ -312,8 +333,9 @@ public final class CarbonDataProcessorUtil {
         } else if (levelInfo[1].toLowerCase().contains(CarbonCommonConstants.STRUCT)) {
           g.addChildren(new StructDataType(levelInfo[0], levelInfo[2], levelInfo[3]));
         } else {
-          g.addChildren(new PrimitiveDataType(levelInfo[0], levelInfo[2], levelInfo[3],
-              Integer.parseInt(levelInfo[4])));
+          g.addChildren(new PrimitiveDataType(levelInfo[0], levelInfo[2], levelInfo[4],
+              Integer.parseInt(levelInfo[5]), levelInfo[3].contains("true"), nullFormat,
+              isEmptyBadRecord));
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
index 458dea8..8f1994a 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/AvroCarbonWriter.java
@@ -64,12 +64,12 @@ class AvroCarbonWriter extends CarbonWriter {
     this.writable = new ObjectArrayWritable();
   }
 
-  private String[] avroToCsv(GenericData.Record avroRecord) {
+  private Object[] avroToCsv(GenericData.Record avroRecord) {
     if (avroSchema == null) {
       avroSchema = avroRecord.getSchema();
     }
     List<Schema.Field> fields = avroSchema.getFields();
-    String[] csvField = new String[fields.size()];
+    Object [] csvField = new String[fields.size()];
     for (int i = 0; i < fields.size(); i++) {
       csvField[i] = avroFieldToString(fields.get(i), avroRecord.get(i));
     }
@@ -88,9 +88,25 @@ class AvroCarbonWriter extends CarbonWriter {
       case FLOAT:
         out.append(fieldValue.toString());
         break;
+      case RECORD:
+        List<Schema.Field> fields = fieldType.schema().getFields();
+        String delimiter = null;
+        for (int i = 0; i < fields.size(); i ++) {
+          if (i == 0) {
+            delimiter = "$";
+          } else {
+            delimiter = ":";
+          }
+          if (i != (fields.size() - 1)) {
+            out.append(avroFieldToString(fields.get(i), ((GenericData.Record) fieldValue).get(i)))
+                .append(delimiter);
+          } else {
+            out.append(avroFieldToString(fields.get(i), ((GenericData.Record) fieldValue).get(i)));
+          }
+        }
+        break;
       default:
         throw new UnsupportedOperationException();
-      // TODO: convert complex type
     }
     return out.toString();
   }
@@ -104,7 +120,7 @@ class AvroCarbonWriter extends CarbonWriter {
       GenericData.Record record = (GenericData.Record) object;
 
       // convert Avro record to CSV String[]
-      String[] csvRecord = avroToCsv(record);
+      Object[] csvRecord = avroToCsv(record);
       writable.set(csvRecord);
       recordWriter.write(NullWritable.get(), writable);
     } catch (Exception e) {
@@ -116,8 +132,7 @@ class AvroCarbonWriter extends CarbonWriter {
   /**
    * Flush and close the writer
    */
-  @Override
-  public void close() throws IOException {
+  @Override public void close() throws IOException {
     try {
       recordWriter.close(context);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3202cf51/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 7ee22ed..3d5c77c 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.datatype.StructField;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -379,9 +380,20 @@ public class CarbonWriterBuilder {
     }
     for (Field field : schema.getFields()) {
       if (null != field) {
-        tableSchemaBuilder.addColumn(
-            new StructField(field.getFieldName(), field.getDataType()),
-            sortColumnsList.contains(field.getFieldName()));
+        if (field.getChildren() != null && field.getChildren().size() > 0) {
+          // Loop through the inner columns and for a StructData
+          List<StructField> structFieldsArray =
+              new ArrayList<StructField>(field.getChildren().size());
+          String parentName = field.getFieldName();
+          for (StructField childFld : field.getChildren()) {
+            structFieldsArray.add(new StructField(childFld.getFieldName(), childFld.getDataType()));
+          }
+          DataType complexType = DataTypes.createStructType(structFieldsArray);
+          tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), complexType), false);
+        } else {
+          tableSchemaBuilder.addColumn(new StructField(field.getFieldName(), field.getDataType()),
+              sortColumnsList.contains(field.getFieldName()));
+        }
       }
     }
     String tableName;