You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/10 14:49:25 UTC

[3/5] incubator-carbondata git commit: Data load integration of all steps for removing kettle

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
index e841bc8..74633e2 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
@@ -24,11 +24,13 @@ import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
 import org.apache.spark.sql.common.util.CarbonHiveContext
 import org.apache.spark.sql.common.util.CarbonHiveContext.sql
 import org.apache.spark.sql.common.util.QueryTest
+
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.spark.load.CarbonLoadModel
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.processing.model.CarbonLoadModel
+
 /**
   * Test Case for org.apache.carbondata.spark.util.GlobalDictionaryUtil
   *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
index e9c9471..1990070 100644
--- a/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
@@ -31,7 +31,8 @@ class LocalSQLContext(val hdfsCarbonBasePath: String)
   extends CarbonContext(new SparkContext(new SparkConf()
     .setAppName("CarbonSpark")
     .setMaster("local[2]")
-    .set("spark.sql.shuffle.partitions", "20")),
+    .set("spark.sql.shuffle.partitions", "20")
+    .set("use_kettle_default", "true")),
     hdfsCarbonBasePath,
     hdfsCarbonBasePath) {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a06133b..4bdd587 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,6 +109,7 @@
     <hadoop.version>2.2.0</hadoop.version>
     <scala.version>2.10.4</scala.version>
     <kettle.version>4.4.0-stable</kettle.version>
+    <use.kettle>true</use.kettle>
     <hadoop.deps.scope>compile</hadoop.deps.scope>
     <spark.deps.scope>compile</spark.deps.scope>
     <scala.deps.scope>compile</scala.deps.scope>
@@ -336,6 +337,12 @@
       <id>include-all</id>
     </profile>
     <profile>
+      <id>no-kettle</id>
+      <properties>
+        <use.kettle>false</use.kettle>
+      </properties>
+    </profile>
+    <profile>
       <id>rat</id>
       <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
index cc5c3a2..09e000f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
@@ -20,7 +20,6 @@
 package org.apache.carbondata.processing.csvload;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -39,7 +38,8 @@ import org.apache.carbondata.processing.constants.DataProcessorConstants;
 import org.apache.carbondata.processing.csvreaderstep.CsvInputMeta;
 import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
 import org.apache.carbondata.processing.etl.DataLoadingException;
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordslogger;
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.pentaho.di.core.KettleEnvironment;
 import org.pentaho.di.core.exception.KettleException;
@@ -97,7 +97,7 @@ public class DataGraphExecuter {
   private String[] getColumnNames(SchemaInfo schemaInfo, String tableName, String partitionId,
       CarbonDataLoadSchema schema) {
 
-    Set<String> columnNames = GraphExecutionUtil.getSchemaColumnNames(schema, tableName);
+    Set<String> columnNames = CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName);
 
     return columnNames.toArray(new String[columnNames.size()]);
   }
@@ -210,7 +210,7 @@ public class DataGraphExecuter {
     if (trans.getErrors() > 0) {
       LOGGER.error("Graph Execution had errors");
       throw new DataLoadingException("Due to internal errors, please check logs for more details.");
-    } else if (null != BadRecordslogger.hasBadRecord(key)) {
+    } else if (null != BadRecordsLogger.hasBadRecord(key)) {
       LOGGER.error("Graph Execution is partcially success");
       throw new DataLoadingException(DataProcessorConstants.BAD_REC_FOUND,
           "Graph Execution is partcially success");
@@ -417,34 +417,6 @@ public class DataGraphExecuter {
     trans.setLogLevel(LogLevel.NOTHING);
   }
 
-  private void validateHeader(SchemaInfo schemaInfo, String partitionId,
-      CarbonDataLoadSchema schema) throws DataLoadingException {
-    String[] columnNames = getColumnNames(schemaInfo, model.getTableName(), partitionId, schema);
-    String[] csvHeader = model.getCsvHeader().toLowerCase().split(",");
-
-    List<String> csvColumnsList = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-
-    for (String column : csvHeader) {
-      csvColumnsList.add(column.replaceAll("\"", "").trim());
-    }
-
-    int count = 0;
-
-    for (String columns : columnNames) {
-      if (csvColumnsList.contains(columns.toLowerCase())) {
-        count++;
-      }
-    }
-
-    if (count != columnNames.length) {
-      LOGGER.error("CSV header provided in DDL is not proper." +
-          " Column names in schema and CSV header are not the same.");
-      throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE,
-          "CSV header provided in DDL is not proper. Column names in schema and CSV header are " +
-              "not the same.");
-    }
-  }
-
   /**
    * This method will validate the both fact as well as dimension csv files.
    *
@@ -503,7 +475,14 @@ public class DataGraphExecuter {
       }
     } else if (model.isDirectLoad()) {
       if (null != model.getCsvHeader() && !model.getCsvHeader().isEmpty()) {
-        validateHeader(schemaInfo, partitionId, schema);
+        if (!CarbonDataProcessorUtil
+            .isHeaderValid(model.getTableName(), model.getCsvHeader(), schema, ",")) {
+          LOGGER.error("CSV header provided in DDL is not proper."
+              + " Column names in schema and CSV header are not the same.");
+          throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE,
+              "CSV header provided in DDL is not proper. Column names in schema and CSV header are "
+                  + "not the same.");
+        }
       } else {
         for (String file : model.getFilesToProcess()) {
           try {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
index 6d82bcd..1dd9bdf 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
@@ -34,8 +34,6 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema.DimensionRelation;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
@@ -194,65 +192,6 @@ public final class GraphExecutionUtil {
   }
 
   /**
-   * This method update the column Name
-   *
-   * @param table
-   * @param tableName
-   * @param schema
-   */
-  public static Set<String> getSchemaColumnNames(CarbonDataLoadSchema schema, String tableName) {
-    Set<String> columnNames = new HashSet<String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    String factTableName = schema.getCarbonTable().getFactTableName();
-    if (tableName.equals(factTableName)) {
-
-      List<CarbonDimension> dimensions =
-          schema.getCarbonTable().getDimensionByTableName(factTableName);
-
-      for (CarbonDimension dimension : dimensions) {
-
-        String foreignKey = null;
-        for (DimensionRelation dimRel : schema.getDimensionRelationList()) {
-          for (String field : dimRel.getColumns()) {
-            if (dimension.getColName().equals(field)) {
-              foreignKey = dimRel.getRelation().getFactForeignKeyColumn();
-              break;
-            }
-          }
-          if (null != foreignKey) {
-            break;
-          }
-        }
-        if (null == foreignKey) {
-          columnNames.add(dimension.getColName());
-        } else {
-          columnNames.add(foreignKey);
-        }
-      }
-
-      List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(factTableName);
-      for (CarbonMeasure msr : measures) {
-        if (!msr.getColumnSchema().isInvisible()) {
-          columnNames.add(msr.getColName());
-        }
-      }
-    } else {
-      List<CarbonDimension> dimensions = schema.getCarbonTable().getDimensionByTableName(tableName);
-      for (CarbonDimension dimension : dimensions) {
-        columnNames.add(dimension.getColName());
-      }
-
-      List<CarbonMeasure> measures = schema.getCarbonTable().getMeasureByTableName(tableName);
-      for (CarbonMeasure msr : measures) {
-        columnNames.add(msr.getColName());
-      }
-    }
-
-    return columnNames;
-
-  }
-
-  /**
    * @param csvFilePath
    * @param columnNames
    * @return

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 2faef7b..9843c2e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -26,8 +26,10 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.processing.newflow.complexobjects.ArrayObject;
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen;
 
 import org.pentaho.di.core.exception.KettleException;
@@ -35,7 +37,7 @@ import org.pentaho.di.core.exception.KettleException;
 /**
  * Array DataType stateless object used in data loading
  */
-public class ArrayDataType implements GenericDataType {
+public class ArrayDataType implements GenericDataType<ArrayObject> {
 
   /**
    * child columns
@@ -177,7 +179,28 @@ public class ArrayDataType implements GenericDataType {
     }
   }
 
-  /*
+  @Override
+  public void writeByteArray(ArrayObject input, DataOutputStream dataOutputStream)
+      throws IOException, DictionaryGenerationException {
+    if (input == null) {
+      dataOutputStream.writeInt(1);
+      children.writeByteArray(null, dataOutputStream);
+    } else {
+      Object[] data = input.getData();
+      dataOutputStream.writeInt(data.length);
+      for (Object eachInput : data) {
+        children.writeByteArray(eachInput, dataOutputStream);
+      }
+    }
+  }
+
+  @Override
+  public void fillCardinality(List<Integer> dimCardWithComplex) {
+    dimCardWithComplex.add(0);
+    children.fillCardinality(dimCardWithComplex);
+  }
+
+  /**
    * parse byte array and bit pack
    */
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
index 0fdca6e..f8c765b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen;
@@ -35,7 +36,7 @@ import org.pentaho.di.core.exception.KettleException;
  * Generic DataType interface which will be used while data loading for complex types like Array &
  * Struct
  */
-public interface GenericDataType {
+public interface GenericDataType<T> {
 
   /**
    * @return name of the column
@@ -78,6 +79,14 @@ public interface GenericDataType {
       CarbonCSVBasedDimSurrogateKeyGen surrogateKeyGen) throws KettleException, IOException;
 
   /**
+   * writes to byte stream
+   * @param dataOutputStream
+   * @throws IOException
+   */
+  void writeByteArray(T input, DataOutputStream dataOutputStream)
+      throws IOException, DictionaryGenerationException;
+
+  /**
    * @return surrogateIndex for primitive column in complex type
    */
   int getSurrogateIndex();
@@ -151,4 +160,11 @@ public interface GenericDataType {
    * @param maxSurrogateKeyArray
    */
   void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
+
+  /**
+   * Fill the cardinality of the primitive datatypes
+   * @param dimCardWithComplex
+   */
+  void fillCardinality(List<Integer> dimCardWithComplex);
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 9199c51..610366c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -25,10 +25,22 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.CarbonUtilException;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.dictionary.DirectDictionary;
+import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen;
 
 import org.pentaho.di.core.exception.KettleException;
@@ -36,7 +48,7 @@ import org.pentaho.di.core.exception.KettleException;
 /**
  * Primitive DataType stateless object used in data loading
  */
-public class PrimitiveDataType implements GenericDataType {
+public class PrimitiveDataType implements GenericDataType<Object> {
 
   /**
    * surrogate index
@@ -78,6 +90,10 @@ public class PrimitiveDataType implements GenericDataType {
    */
   private int dataCounter;
 
+  private BiDictionary<Integer, Object> dictionaryGenerator;
+
+  private CarbonDimension carbonDimension;
+
   /**
    * constructor
    *
@@ -92,6 +108,36 @@ public class PrimitiveDataType implements GenericDataType {
     this.dimensionOrdinal = dimensionOrdinal;
   }
 
+  /**
+   * constructor
+   *
+   * @param name
+   * @param parentname
+   * @param columnId
+   */
+  public PrimitiveDataType(String name, String parentname, String columnId,
+      CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+      CarbonTableIdentifier carbonTableIdentifier) {
+    this.name = name;
+    this.parentname = parentname;
+    this.columnId = columnId;
+    this.carbonDimension = carbonDimension;
+    DictionaryColumnUniqueIdentifier identifier =
+        new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
+            carbonDimension.getColumnIdentifier(), carbonDimension.getDataType());
+    try {
+      if (carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(carbonDimension.getDataType()));
+      } else {
+        Dictionary dictionary = cache.get(identifier);
+        dictionaryGenerator = new PreCreatedDictionary(dictionary);
+      }
+    } catch (CarbonUtilException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   /*
    * primitive column will not have any child column
    */
@@ -178,9 +224,30 @@ public class PrimitiveDataType implements GenericDataType {
     dataOutputStream.writeInt(surrogateKey);
   }
 
+  @Override public void writeByteArray(Object input, DataOutputStream dataOutputStream)
+      throws IOException, DictionaryGenerationException {
+    String parsedValue =
+        input == null ? null : DataTypeUtil.parseValue(input.toString(), carbonDimension);
+    Integer surrogateKey;
+    if (null == parsedValue) {
+      surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+    } else {
+      surrogateKey = dictionaryGenerator.getOrGenerateKey(parsedValue);
+      if (surrogateKey == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+        surrogateKey = CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY;
+      }
+    }
+    dataOutputStream.writeInt(surrogateKey);
+  }
+
+  @Override
+  public void fillCardinality(List<Integer> dimCardWithComplex) {
+    dimCardWithComplex.add(dictionaryGenerator.size());
+  }
+
   /*
-   * parse bytearray and bit pack
-   */
+       * parse bytearray and bit pack
+       */
   @Override
   public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
       KeyGenerator[] generator) throws IOException, KeyGenException {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index bcf18c8..f034895 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -26,8 +26,10 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.processing.newflow.complexobjects.StructObject;
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen;
 
 import org.pentaho.di.core.exception.KettleException;
@@ -35,7 +37,7 @@ import org.pentaho.di.core.exception.KettleException;
 /**
  * Struct DataType stateless object used in data loading
  */
-public class StructDataType implements GenericDataType {
+public class StructDataType implements GenericDataType<StructObject> {
 
   /**
    * children columns
@@ -186,9 +188,38 @@ public class StructDataType implements GenericDataType {
     }
   }
 
+  @Override public void writeByteArray(StructObject input, DataOutputStream dataOutputStream)
+      throws IOException, DictionaryGenerationException {
+    dataOutputStream.writeInt(children.size());
+    if (input == null) {
+      dataOutputStream.writeInt(children.size());
+      for (int i = 0; i < children.size(); i++) {
+        children.get(i).writeByteArray(null, dataOutputStream);
+      }
+    } else {
+      Object[] data = input.getData();
+      for (int i = 0; i < data.length && i < children.size(); i++) {
+        children.get(i).writeByteArray(data[i], dataOutputStream);
+      }
+
+      // For other children elements which dont have data, write empty
+      for (int i = data.length; i < children.size(); i++) {
+        children.get(i).writeByteArray(null, dataOutputStream);
+      }
+    }
+  }
+
+  @Override
+  public void fillCardinality(List<Integer> dimCardWithComplex) {
+    dimCardWithComplex.add(0);
+    for (int i = 0; i < children.size(); i++) {
+      children.get(i).fillCardinality(dimCardWithComplex);
+    }
+  }
+
   /*
-   * parse bytearray and bit pack
-   */
+       * parse bytearray and bit pack
+       */
   @Override
   public void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
       KeyGenerator[] generator) throws IOException, KeyGenException {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java b/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
index 57d74c3..a420d27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/mdkeygen/MDKeyGenStep.java
@@ -363,7 +363,7 @@ public class MDKeyGenStep extends BaseStep {
     String carbonDataDirectoryPath = getCarbonDataFolderLocation();
     finalMerger = new SingleThreadFinalSortFilesMerger(dataFolderLocation, tableName,
         dimensionCount - meta.getComplexDimsCount(), meta.getComplexDimsCount(), measureCount,
-        meta.getNoDictionaryCount(), aggType, isNoDictionaryDimension);
+        meta.getNoDictionaryCount(), aggType, isNoDictionaryDimension, true);
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = getCarbonFactDataHandlerModel();
     carbonFactDataHandlerModel.setPrimitiveDimLens(simpleDimsLen);
     carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
new file mode 100644
index 0000000..10f5197
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.carbondata.processing.model;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+
+public class CarbonLoadModel implements Serializable {
+  /**
+   *
+   */
+  private static final long serialVersionUID = 6580168429197697465L;
+
+  private String databaseName;
+
+  private String tableName;
+
+  private String factFilePath;
+
+  private String dimFolderPath;
+
+  private String colDictFilePath;
+
+  private String partitionId;
+
+  private CarbonDataLoadSchema carbonDataLoadSchema;
+
+  private String[] aggTables;
+
+  private String aggTableName;
+
+  private boolean aggLoadRequest;
+
+  private String storePath;
+
+  private boolean isRetentionRequest;
+
+  private List<String> factFilesToProcess;
+  private String csvHeader;
+  private String csvDelimiter;
+  private String complexDelimiterLevel1;
+  private String complexDelimiterLevel2;
+
+  private boolean isDirectLoad;
+  private List<LoadMetadataDetails> loadMetadataDetails;
+
+  private String blocksID;
+
+  /**
+   *  Map from carbon dimension to pre defined dict file path
+   */
+  private HashMap<CarbonDimension, String> predefDictMap;
+
+  /**
+   * task id, each spark task has a unique id
+   */
+  private String taskNo;
+  /**
+   * new load start time
+   */
+  private String factTimeStamp;
+  /**
+   * load Id
+   */
+  private String segmentId;
+
+  private String allDictPath;
+
+  /**
+   * escape Char
+   */
+  private String escapeChar;
+
+  /**
+   * quote Char
+   */
+  private String quoteChar;
+
+  /**
+   * comment Char
+   */
+  private String commentChar;
+
+  private String dateFormat;
+
+  /**
+   * defines the string that should be treated as null while loadind data
+   */
+  private String serializationNullFormat;
+
+  /**
+   * defines the string to specify whether the bad record logger should be enabled or not
+   */
+  private String badRecordsLoggerEnable;
+
+  /**
+   * defines the option to specify the bad record logger action
+   */
+  private String badRecordsAction;
+
+  /**
+   * Max number of columns that needs to be parsed by univocity parser
+   */
+  private String maxColumns;
+
+  /**
+   * the key of RDD Iterator in RDD iterator Map
+   */
+  private String rddIteratorKey;
+
+  /**
+   * get escape char
+   * @return
+   */
+  public String getEscapeChar() {
+    return escapeChar;
+  }
+
+  /**
+   * set escape char
+   * @param escapeChar
+   */
+  public void setEscapeChar(String escapeChar) {
+    this.escapeChar = escapeChar;
+  }
+
+  /**
+   * get blocck id
+   *
+   * @return
+   */
+  public String getBlocksID() {
+    return blocksID;
+  }
+
+  /**
+   * set block id for carbon load model
+   *
+   * @param blocksID
+   */
+  public void setBlocksID(String blocksID) {
+    this.blocksID = blocksID;
+  }
+
+  public String getCsvDelimiter() {
+    return csvDelimiter;
+  }
+
+  public void setCsvDelimiter(String csvDelimiter) {
+    this.csvDelimiter = csvDelimiter;
+  }
+
+  public String getComplexDelimiterLevel1() {
+    return complexDelimiterLevel1;
+  }
+
+  public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
+    this.complexDelimiterLevel1 = complexDelimiterLevel1;
+  }
+
+  public String getComplexDelimiterLevel2() {
+    return complexDelimiterLevel2;
+  }
+
+  public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
+    this.complexDelimiterLevel2 = complexDelimiterLevel2;
+  }
+
+  public boolean isDirectLoad() {
+    return isDirectLoad;
+  }
+
+  public void setDirectLoad(boolean isDirectLoad) {
+    this.isDirectLoad = isDirectLoad;
+  }
+
+  public String getAllDictPath() {
+    return allDictPath;
+  }
+
+  public void setAllDictPath(String allDictPath) {
+    this.allDictPath = allDictPath;
+  }
+
+  public List<String> getFactFilesToProcess() {
+    return factFilesToProcess;
+  }
+
+  public void setFactFilesToProcess(List<String> factFilesToProcess) {
+    this.factFilesToProcess = factFilesToProcess;
+  }
+
+  public String getCsvHeader() {
+    return csvHeader;
+  }
+
+  public void setCsvHeader(String csvHeader) {
+    this.csvHeader = csvHeader;
+  }
+
+  public void initPredefDictMap() {
+    predefDictMap = new HashMap<>();
+  }
+
+  public String getPredefDictFilePath(CarbonDimension dimension) {
+    return predefDictMap.get(dimension);
+  }
+
+  public void setPredefDictMap(CarbonDimension dimension, String predefDictFilePath) {
+    this.predefDictMap.put(dimension, predefDictFilePath);
+  }
+
+  /**
+   * @return carbon dataload schema
+   */
+  public CarbonDataLoadSchema getCarbonDataLoadSchema() {
+    return carbonDataLoadSchema;
+  }
+
+  /**
+   * @param carbonDataLoadSchema
+   */
+  public void setCarbonDataLoadSchema(CarbonDataLoadSchema carbonDataLoadSchema) {
+    this.carbonDataLoadSchema = carbonDataLoadSchema;
+  }
+
+  /**
+   * @return the databaseName
+   */
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  /**
+   * @param databaseName the databaseName to set
+   */
+  public void setDatabaseName(String databaseName) {
+    this.databaseName = databaseName;
+  }
+
+  /**
+   * @return the tableName
+   */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /**
+   * @param tableName the tableName to set
+   */
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  /**
+   * @return the factFilePath
+   */
+  public String getFactFilePath() {
+    return factFilePath;
+  }
+
+  /**
+   * @param factFilePath the factFilePath to set
+   */
+  public void setFactFilePath(String factFilePath) {
+    this.factFilePath = factFilePath;
+  }
+
+  /**
+   *
+   * @return external column dictionary file path
+   */
+  public String getColDictFilePath() {
+    return colDictFilePath;
+  }
+
+  /**
+   * set external column dictionary file path
+   * @param colDictFilePath
+   */
+  public void setColDictFilePath(String colDictFilePath) {
+    this.colDictFilePath = colDictFilePath;
+  }
+
+  /**
+   * @return the dimFolderPath
+   */
+  public String getDimFolderPath() {
+    return dimFolderPath;
+  }
+
+  //TODO SIMIAN
+
+  /**
+   * @param dimFolderPath the dimFolderPath to set
+   */
+  public void setDimFolderPath(String dimFolderPath) {
+    this.dimFolderPath = dimFolderPath;
+  }
+
+  /**
+   * get copy with parition
+   *
+   * @param uniqueId
+   * @return
+   */
+  public CarbonLoadModel getCopyWithPartition(String uniqueId) {
+    CarbonLoadModel copy = new CarbonLoadModel();
+    copy.tableName = tableName;
+    copy.dimFolderPath = dimFolderPath;
+    copy.factFilePath = factFilePath + '/' + uniqueId;
+    copy.databaseName = databaseName;
+    copy.partitionId = uniqueId;
+    copy.aggTables = aggTables;
+    copy.aggTableName = aggTableName;
+    copy.aggLoadRequest = aggLoadRequest;
+    copy.loadMetadataDetails = loadMetadataDetails;
+    copy.isRetentionRequest = isRetentionRequest;
+    copy.complexDelimiterLevel1 = complexDelimiterLevel1;
+    copy.complexDelimiterLevel2 = complexDelimiterLevel2;
+    copy.carbonDataLoadSchema = carbonDataLoadSchema;
+    copy.blocksID = blocksID;
+    copy.taskNo = taskNo;
+    copy.factTimeStamp = factTimeStamp;
+    copy.segmentId = segmentId;
+    copy.serializationNullFormat = serializationNullFormat;
+    copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
+    copy.badRecordsAction = badRecordsAction;
+    copy.escapeChar = escapeChar;
+    copy.quoteChar = quoteChar;
+    copy.commentChar = commentChar;
+    copy.maxColumns = maxColumns;
+    copy.storePath = storePath;
+    return copy;
+  }
+
+  /**
+   * get CarbonLoadModel with partition
+   *
+   * @param uniqueId
+   * @param filesForPartition
+   * @param header
+   * @param delimiter
+   * @return
+   */
+  public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
+      String header, String delimiter) {
+    CarbonLoadModel copyObj = new CarbonLoadModel();
+    copyObj.tableName = tableName;
+    copyObj.dimFolderPath = dimFolderPath;
+    copyObj.factFilePath = null;
+    copyObj.databaseName = databaseName;
+    copyObj.partitionId = uniqueId;
+    copyObj.aggTables = aggTables;
+    copyObj.aggTableName = aggTableName;
+    copyObj.aggLoadRequest = aggLoadRequest;
+    copyObj.loadMetadataDetails = loadMetadataDetails;
+    copyObj.isRetentionRequest = isRetentionRequest;
+    copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
+    copyObj.csvHeader = header;
+    copyObj.factFilesToProcess = filesForPartition;
+    copyObj.isDirectLoad = true;
+    copyObj.csvDelimiter = delimiter;
+    copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
+    copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;
+    copyObj.blocksID = blocksID;
+    copyObj.taskNo = taskNo;
+    copyObj.factTimeStamp = factTimeStamp;
+    copyObj.segmentId = segmentId;
+    copyObj.serializationNullFormat = serializationNullFormat;
+    copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
+    copyObj.badRecordsAction = badRecordsAction;
+    copyObj.escapeChar = escapeChar;
+    copyObj.quoteChar = quoteChar;
+    copyObj.commentChar = commentChar;
+    copyObj.dateFormat = dateFormat;
+    copyObj.maxColumns = maxColumns;
+    copyObj.storePath = storePath;
+    return copyObj;
+  }
+
+  /**
+   * @return the partitionId
+   */
+  public String getPartitionId() {
+    return partitionId;
+  }
+
+  /**
+   * @param partitionId the partitionId to set
+   */
+  public void setPartitionId(String partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  /**
+   * @return the aggTables
+   */
+  public String[] getAggTables() {
+    return aggTables;
+  }
+
+  /**
+   * @param aggTables the aggTables to set
+   */
+  public void setAggTables(String[] aggTables) {
+    this.aggTables = aggTables;
+  }
+
+  /**
+   * @return the aggLoadRequest
+   */
+  public boolean isAggLoadRequest() {
+    return aggLoadRequest;
+  }
+
+  /**
+   * @param aggLoadRequest the aggLoadRequest to set
+   */
+  public void setAggLoadRequest(boolean aggLoadRequest) {
+    this.aggLoadRequest = aggLoadRequest;
+  }
+
+  /**
+   * @param storePath The storePath to set.
+   */
+  public void setStorePath(String storePath) {
+    this.storePath = storePath;
+  }
+
+  /**
+   * @return Returns the aggTableName.
+   */
+  public String getAggTableName() {
+    return aggTableName;
+  }
+
+  /**
+   * @return Returns the factStoreLocation.
+   */
+  public String getStorePath() {
+    return storePath;
+  }
+
+  /**
+   * @param aggTableName The aggTableName to set.
+   */
+  public void setAggTableName(String aggTableName) {
+    this.aggTableName = aggTableName;
+  }
+
+  /**
+   * isRetentionRequest
+   *
+   * @return
+   */
+  public boolean isRetentionRequest() {
+    return isRetentionRequest;
+  }
+
+  /**
+   * @param isRetentionRequest
+   */
+  public void setRetentionRequest(boolean isRetentionRequest) {
+    this.isRetentionRequest = isRetentionRequest;
+  }
+
+  /**
+   * getLoadMetadataDetails.
+   *
+   * @return
+   */
+  public List<LoadMetadataDetails> getLoadMetadataDetails() {
+    return loadMetadataDetails;
+  }
+
+  /**
+   * setLoadMetadataDetails.
+   *
+   * @param loadMetadataDetails
+   */
+  public void setLoadMetadataDetails(List<LoadMetadataDetails> loadMetadataDetails) {
+    this.loadMetadataDetails = loadMetadataDetails;
+  }
+
+  /**
+   * @return
+   */
+  public String getTaskNo() {
+    return taskNo;
+  }
+
+  /**
+   * @param taskNo
+   */
+  public void setTaskNo(String taskNo) {
+    this.taskNo = taskNo;
+  }
+
+  /**
+   * @return
+   */
+  public String getFactTimeStamp() {
+    return factTimeStamp;
+  }
+
+  /**
+   * @param factTimeStamp
+   */
+  public void setFactTimeStamp(String factTimeStamp) {
+    this.factTimeStamp = factTimeStamp;
+  }
+
+  public String[] getDelimiters() {
+    return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 };
+  }
+
+  /**
+   * @return load Id
+   */
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  /**
+   * @param segmentId
+   */
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+
+  /**
+   * the method returns the value to be treated as null while data load
+   * @return
+   */
+  public String getSerializationNullFormat() {
+    return serializationNullFormat;
+  }
+
+  /**
+   * the method sets the value to be treated as null while data load
+   * @param serializationNullFormat
+   */
+  public void setSerializationNullFormat(String serializationNullFormat) {
+    this.serializationNullFormat = serializationNullFormat;
+  }
+
+  /**
+   * returns the string to enable bad record logger
+   * @return
+   */
+  public String getBadRecordsLoggerEnable() {
+    return badRecordsLoggerEnable;
+  }
+
+  /**
+   * method sets the string to specify whether to enable or dissable the badrecord logger.
+   * @param badRecordsLoggerEnable
+   */
+  public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
+    this.badRecordsLoggerEnable = badRecordsLoggerEnable;
+  }
+
+  public String getQuoteChar() {
+    return quoteChar;
+  }
+
+  public void setQuoteChar(String quoteChar) {
+    this.quoteChar = quoteChar;
+  }
+
+  public String getCommentChar() {
+    return commentChar;
+  }
+
+  public void setCommentChar(String commentChar) {
+    this.commentChar = commentChar;
+  }
+
+  public String getDateFormat() { return dateFormat; }
+
+  public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; }
+
+  /**
+   * @return
+   */
+  public String getMaxColumns() {
+    return maxColumns;
+  }
+
+  /**
+   * @param maxColumns
+   */
+  public void setMaxColumns(String maxColumns) {
+    this.maxColumns = maxColumns;
+  }
+
+  /**
+   *  returns option to specify the bad record logger action
+   * @return
+   */
+  public String getBadRecordsAction() {
+    return badRecordsAction;
+  }
+
+  /**
+   * set option to specify the bad record logger action
+   * @param badRecordsAction
+   */
+  public void setBadRecordsAction(String badRecordsAction) {
+    this.badRecordsAction = badRecordsAction;
+  }
+
+  public String getRddIteratorKey() {
+    return rddIteratorKey;
+  }
+
+  public void setRddIteratorKey(String rddIteratorKey) {
+    this.rddIteratorKey = rddIteratorKey;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
index 3e6d63e..c5dc5d1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
@@ -30,10 +30,16 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColu
  */
 public class DataField implements Serializable {
 
+  public DataField(CarbonColumn column) {
+    this.column = column;
+  }
+
   private CarbonColumn column;
 
   private CompressionCodec compressionCodec;
 
+  private String dateFormat;
+
   public boolean hasDictionaryEncoding() {
     return column.hasEncoding(Encoding.DICTIONARY);
   }
@@ -42,10 +48,6 @@ public class DataField implements Serializable {
     return column;
   }
 
-  public void setColumn(CarbonColumn column) {
-    this.column = column;
-  }
-
   public CompressionCodec getCompressionCodec() {
     return compressionCodec;
   }
@@ -54,4 +56,11 @@ public class DataField implements Serializable {
     this.compressionCodec = compressionCodec;
   }
 
+  public String getDateFormat() {
+    return dateFormat;
+  }
+
+  public void setDateFormat(String dateFormat) {
+    this.dateFormat = dateFormat;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
new file mode 100644
index 0000000..746e0f2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
+
+/**
+ * It executes the data load.
+ */
+public class DataLoadExecutor {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataLoadExecutor.class.getName());
+
+  public void execute(CarbonLoadModel loadModel, String storeLocation,
+      Iterator<Object[]>[] inputIterators) throws Exception {
+    AbstractDataLoadProcessorStep loadProcessorStep = null;
+    try {
+
+      loadProcessorStep =
+          new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators);
+      // 1. initialize
+      loadProcessorStep.initialize();
+      LOGGER.info("Data Loading is started for table " + loadModel.getTableName());
+      // 2. execute the step
+      loadProcessorStep.execute();
+    } catch (CarbonDataLoadingException e) {
+      throw e;
+    } catch (Exception e) {
+      LOGGER.error(e, "Data Loading failed for table " + loadModel.getTableName());
+      throw new CarbonDataLoadingException(
+          "Data Loading failed for table " + loadModel.getTableName(), e);
+    } finally {
+      if (loadProcessorStep != null) {
+        // 3. Close the step
+        loadProcessorStep.close();
+      }
+    }
+
+    String key =
+        new CarbonTableIdentifier(loadModel.getDatabaseName(), loadModel.getTableName(), null)
+            .getBadRecordLoggerKey();
+    if (null != BadRecordsLogger.hasBadRecord(key)) {
+      LOGGER.error("Data Load is partcially success for table " + loadModel.getTableName());
+      throw new BadRecordFoundException("Bad records found during load");
+    } else {
+      LOGGER.info("Data loading is successful for table "+loadModel.getTableName());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
new file mode 100644
index 0000000..92c677c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
+import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
+import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
+import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It builds the pipe line of steps for loading data to carbon.
+ */
+public final class DataLoadProcessBuilder {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName());
+
+  public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
+      Iterator[] inputIterators) throws Exception {
+    CarbonDataLoadConfiguration configuration =
+        createConfiguration(loadModel, storeLocation);
+    // 1. Reads the data input iterators and parses the data.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new InputProcessorStepImpl(configuration, inputIterators);
+    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+    // data types and configurations.
+    AbstractDataLoadProcessorStep converterProcessorStep =
+        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+    // 3. Sorts the data which are part of key (all dimensions except complex types)
+    AbstractDataLoadProcessorStep sortProcessorStep =
+        new SortProcessorStepImpl(configuration, converterProcessorStep);
+    // 4. Writes the sorted data in carbondata format.
+    AbstractDataLoadProcessorStep writerProcessorStep =
+        new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+    return writerProcessorStep;
+  }
+
+  private CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel,
+      String storeLocation) throws Exception {
+    if (!new File(storeLocation).mkdirs()) {
+      LOGGER.error("Error while creating the temp store path: " + storeLocation);
+    }
+    CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration();
+    String databaseName = loadModel.getDatabaseName();
+    String tableName = loadModel.getTableName();
+    String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
+        + CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+    CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath());
+
+    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+    configuration.setTableIdentifier(identifier);
+    String csvHeader = loadModel.getCsvHeader();
+    String csvFileName = null;
+    if (csvHeader != null && !csvHeader.isEmpty()) {
+      configuration.setHeader(CarbonDataProcessorUtil.getColumnFields(csvHeader, ","));
+    } else {
+      CarbonFile csvFile =
+          CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0));
+      csvFileName = csvFile.getName();
+      csvHeader = CarbonDataProcessorUtil.getFileHeader(csvFile);
+      configuration.setHeader(
+          CarbonDataProcessorUtil.getColumnFields(csvHeader, loadModel.getCsvDelimiter()));
+    }
+    if (!CarbonDataProcessorUtil
+        .isHeaderValid(loadModel.getTableName(), csvHeader, loadModel.getCarbonDataLoadSchema(),
+            loadModel.getCsvDelimiter())) {
+      if (csvFileName == null) {
+        LOGGER.error("CSV header provided in DDL is not proper."
+            + " Column names in schema and CSV header are not the same.");
+        throw new CarbonDataLoadingException(
+            "CSV header provided in DDL is not proper. Column names in schema and CSV header are "
+                + "not the same.");
+      } else {
+        LOGGER.error(
+            "CSV File provided is not proper. Column names in schema and csv header are not same. "
+                + "CSVFile Name : " + csvFileName);
+        throw new CarbonDataLoadingException(
+            "CSV File provided is not proper. Column names in schema and csv header are not same. "
+                + "CSVFile Name : " + csvFileName);
+      }
+    }
+
+    configuration.setPartitionId(loadModel.getPartitionId());
+    configuration.setSegmentId(loadModel.getSegmentId());
+    configuration.setTaskNo(loadModel.getTaskNo());
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
+        new String[] { loadModel.getComplexDelimiterLevel1(),
+            loadModel.getComplexDelimiterLevel2() });
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
+        loadModel.getSerializationNullFormat().split(",")[1]);
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP,
+        loadModel.getFactTimeStamp());
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE,
+        loadModel.getBadRecordsLoggerEnable().split(",")[1]);
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION,
+        loadModel.getBadRecordsAction().split(",")[1]);
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
+        loadModel.getFactFilePath());
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+    Map<String, String> dateFormatMap =
+        CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
+    List<DataField> dataFields = new ArrayList<>();
+    List<DataField> complexDataFields = new ArrayList<>();
+
+    // First add dictionary and non dictionary dimensions because these are part of mdk key.
+    // And then add complex data types and measures.
+    for (CarbonColumn column : dimensions) {
+      DataField dataField = new DataField(column);
+      dataField.setDateFormat(dateFormatMap.get(column.getColName()));
+      if (column.isComplex()) {
+        complexDataFields.add(dataField);
+      } else {
+        dataFields.add(dataField);
+      }
+    }
+    dataFields.addAll(complexDataFields);
+    for (CarbonColumn column : measures) {
+      // This dummy measure is added when no measure was present. We no need to load it.
+      if (!(column.getColName().equals("default_dummy_measure"))) {
+        dataFields.add(new DataField(column));
+      }
+    }
+    configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
+    return configuration;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
index 15f5b0e..958fd1a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
@@ -34,4 +34,12 @@ public final class DataLoadProcessorConstants {
 
   public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS";
 
+  public static final String SERIALIZATION_NULL_FORMAT = "SERIALIZATION_NULL_FORMAT";
+
+  public static final String BAD_RECORDS_LOGGER_ENABLE = "BAD_RECORDS_LOGGER_ENABLE";
+
+  public static final String BAD_RECORDS_LOGGER_ACTION = "BAD_RECORDS_LOGGER_ACTION";
+
+  public static final String FACT_FILE_PATH = "FACT_FILE_PATH";
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
new file mode 100644
index 0000000..2455392
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.converter;
+
+/**
+ * It is holder for reason of bad records.
+ */
+public class BadRecordLogHolder {
+
+  private String reason;
+
+  private boolean badRecordAdded;
+
+  public String getReason() {
+    return reason;
+  }
+
+  public void setReason(String reason) {
+    this.reason = reason;
+    badRecordAdded = true;
+  }
+
+  public boolean isBadRecordNotAdded() {
+    return badRecordAdded;
+  }
+
+  public void clear() {
+    this.badRecordAdded = false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
index e304fbc..8dda65d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
@@ -30,7 +30,9 @@ public interface FieldConverter {
   /**
    * It converts the column field and updates the data in same location/index in row.
    * @param row
+   * @return the status whether it could be loaded or not, usually when record is added
+   * to bad records then it returns false.
    * @throws CarbonDataLoadingException
    */
-  void convert(CarbonRow row) throws CarbonDataLoadingException;
+  void convert(CarbonRow row, BadRecordLogHolder logHolder) throws CarbonDataLoadingException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
index 44f1116..3b199ab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
@@ -26,7 +26,11 @@ import org.apache.carbondata.processing.newflow.row.CarbonRow;
  */
 public interface RowConverter {
 
+  void initialize();
+
   CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException;
 
+  RowConverter createCopyForNewThread();
+
   void finish();
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
index 70a900c..790a970 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
@@ -18,10 +18,12 @@
  */
 package org.apache.carbondata.processing.newflow.converter.impl;
 
+import java.util.List;
+
 import org.apache.carbondata.processing.newflow.converter.FieldConverter;
 
 public abstract class AbstractDictionaryFieldConverterImpl implements FieldConverter {
 
-  public abstract int getColumnCardinality();
+  public abstract void fillColumnCardinality(List<Integer> cardinality);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
index 4c18aa7..4b7aa40 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
@@ -19,12 +19,42 @@
 
 package org.apache.carbondata.processing.newflow.converter.impl;
 
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.List;
+
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 
-public class ComplexFieldConverterImpl implements FieldConverter {
+public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
+
+  private GenericDataType genericDataType;
+
+  private int index;
+
+  public ComplexFieldConverterImpl(GenericDataType genericDataType, int index) {
+    this.genericDataType = genericDataType;
+    this.index = index;
+  }
 
   @Override
-  public void convert(CarbonRow row) {
+  public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+    Object object = row.getObject(index);
+    // TODO Its temporary, needs refactor here.
+    ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
+    try {
+      genericDataType.writeByteArray(object, dataOutputStream);
+      dataOutputStream.close();
+      row.update(byteArray.toByteArray(), index);
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(object+"", e);
+    }
+  }
+
+  @Override public void fillColumnCardinality(List<Integer> cardinality) {
+    genericDataType.fillCardinality(cardinality);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
index 8ca4ff2..3182a37 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
@@ -19,16 +19,22 @@
 
 package org.apache.carbondata.processing.newflow.converter.impl;
 
+import java.util.List;
+
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.util.CarbonUtilException;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
@@ -38,14 +44,20 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(DictionaryFieldConverterImpl.class.getName());
 
-  private BiDictionary<Integer, String> dictionaryGenerator;
+  private BiDictionary<Integer, Object> dictionaryGenerator;
 
   private int index;
 
+  private CarbonDimension carbonDimension;
+
+  private String nullFormat;
+
   public DictionaryFieldConverterImpl(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, int index) {
+      CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index) {
     this.index = index;
+    this.carbonDimension = (CarbonDimension) dataField.getColumn();
+    this.nullFormat = nullFormat;
     DictionaryColumnUniqueIdentifier identifier =
         new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
             dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType());
@@ -58,17 +70,22 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
     }
   }
 
-  @Override
-  public void convert(CarbonRow row) throws CarbonDataLoadingException {
+  @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder)
+      throws CarbonDataLoadingException {
     try {
-      row.update(dictionaryGenerator.getOrGenerateKey(row.getString(index)), index);
+      String parsedValue = DataTypeUtil.parseValue(row.getString(index), carbonDimension);
+      if(null == parsedValue || parsedValue.equals(nullFormat)) {
+        row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY, index);
+      } else {
+        row.update(dictionaryGenerator.getOrGenerateKey(parsedValue), index);
+      }
     } catch (DictionaryGenerationException e) {
       throw new CarbonDataLoadingException(e);
     }
   }
 
   @Override
-  public int getColumnCardinality() {
-    return dictionaryGenerator.size();
+  public void fillColumnCardinality(List<Integer> cardinality) {
+    cardinality.add(dictionaryGenerator.size());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
index 8ff110a..c8113ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -19,9 +19,13 @@
 
 package org.apache.carbondata.processing.newflow.converter.impl;
 
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 
 public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
@@ -30,21 +34,50 @@ public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldC
 
   private int index;
 
-  public DirectDictionaryFieldConverterImpl(DataField dataField, int index) {
-    DirectDictionaryGenerator directDictionaryGenerator =
-        DirectDictionaryKeyGeneratorFactory
-            .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
-    this.directDictionaryGenerator = directDictionaryGenerator;
+  private String nullFormat;
+
+  private CarbonColumn column;
+
+  public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index) {
+    this.nullFormat = nullFormat;
+    this.column = dataField.getColumn();
+    if (dataField.getDateFormat() != null && !dataField.getDateFormat().isEmpty()) {
+      this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(dataField.getColumn().getDataType(),
+              dataField.getDateFormat());
+
+    } else {
+      this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
+    }
     this.index = index;
   }
 
   @Override
-  public void convert(CarbonRow row) {
-    row.update(directDictionaryGenerator.generateDirectSurrogateKey(row.getString(index)), index);
+  public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+    String value = row.getString(index);
+    if (value == null) {
+      logHolder.setReason(
+          "The value " + " \"" + row.getString(index) + "\"" + " with column name " + column
+              .getColName() + " and column data type " + column.getDataType() + " is not a valid "
+              + column.getDataType() + " type.");
+      row.update(1, index);
+    } else if (value.equals(nullFormat)) {
+      row.update(1, index);
+    } else {
+      int key = directDictionaryGenerator.generateDirectSurrogateKey(value);
+      if (key == 1) {
+        logHolder.setReason(
+            "The value " + " \"" + row.getString(index) + "\"" + " with column name " + column
+                .getColName() + " and column data type " + column.getDataType() + " is not a valid "
+                + column.getDataType() + " type.");
+      }
+      row.update(key, index);
+    }
   }
 
   @Override
-  public int getColumnCardinality() {
-    return Integer.MAX_VALUE;
+  public void fillColumnCardinality(List<Integer> cardinality) {
+    cardinality.add(Integer.MAX_VALUE);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index a10ad20..a46b9ba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -18,11 +18,19 @@
  */
 package org.apache.carbondata.processing.newflow.converter.impl;
 
+import java.util.List;
+
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
+import org.apache.carbondata.processing.datatypes.StructDataType;
 import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.newflow.converter.FieldConverter;
 
@@ -43,27 +51,83 @@ public class FieldEncoderFactory {
 
   /**
    * Creates the FieldConverter for all dimensions, for measures return null.
-   * @param dataField column schema
-   * @param cache dicionary cache.
+   *
+   * @param dataField             column schema
+   * @param cache                 dicionary cache.
    * @param carbonTableIdentifier table identifier
-   * @param index index of column in the row.
+   * @param index                 index of column in the row.
    * @return
    */
   public FieldConverter createFieldEncoder(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, int index) {
+      CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat) {
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimesion()) {
-      if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY)) {
-        return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, index);
-      } else if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        return new DirectDictionaryFieldConverterImpl(dataField, index);
+      if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+          !dataField.getColumn().isComplex()) {
+        return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index);
+      } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
+          !dataField.getColumn().isComplex()) {
+        return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
+            index);
       } else if (dataField.getColumn().isComplex()) {
-        return new ComplexFieldConverterImpl();
+        return new ComplexFieldConverterImpl(
+            createComplexType(dataField, cache, carbonTableIdentifier), index);
       } else {
-        return new NonDictionaryFieldConverterImpl(dataField, index);
+        return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index);
       }
+    } else {
+      return new MeasureFieldConverterImpl(dataField, nullFormat, index);
+    }
+  }
+
+  /**
+   * Create parser for the carbon column.
+   */
+  private static GenericDataType createComplexType(DataField dataField,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+      CarbonTableIdentifier carbonTableIdentifier) {
+    return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
+        carbonTableIdentifier);
+  }
+
+  /**
+   * This method may be called recursively if the carbon column is complex type.
+   *
+   * @return GenericDataType
+   */
+  private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+      CarbonTableIdentifier carbonTableIdentifier) {
+    switch (carbonColumn.getDataType()) {
+      case ARRAY:
+        List<CarbonDimension> listOfChildDimensions =
+            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+        // Create array parser with complex delimiter
+        ArrayDataType arrayDataType =
+            new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
+        for (CarbonDimension dimension : listOfChildDimensions) {
+          arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
+              carbonTableIdentifier));
+        }
+        return arrayDataType;
+      case STRUCT:
+        List<CarbonDimension> dimensions =
+            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+        // Create struct parser with complex delimiter
+        StructDataType structDataType =
+            new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
+        for (CarbonDimension dimension : dimensions) {
+          structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
+              carbonTableIdentifier));
+        }
+        return structDataType;
+      case MAP:
+        throw new UnsupportedOperationException("Complex type Map is not supported yet");
+      default:
+        return new PrimitiveDataType(carbonColumn.getColName(), parentName,
+            carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
+            carbonTableIdentifier);
     }
-    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
new file mode 100644
index 0000000..c419d46
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.converter.impl;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.newflow.converter.FieldConverter;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+
+/**
+ * Converter for measure
+ */
+public class MeasureFieldConverterImpl implements FieldConverter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(MeasureFieldConverterImpl.class.getName());
+
+  private int index;
+
+  private DataType dataType;
+
+  private CarbonMeasure measure;
+
+  private String nullformat;
+
+  public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index) {
+    this.dataType = dataField.getColumn().getDataType();
+    this.measure = (CarbonMeasure) dataField.getColumn();
+    this.nullformat = nullformat;
+    this.index = index;
+  }
+
+  @Override
+  public void convert(CarbonRow row, BadRecordLogHolder logHolder)
+      throws CarbonDataLoadingException {
+    String value = row.getString(index);
+    Object output;
+    if (value == null) {
+      logHolder.setReason(
+          "The value " + " \"" + value + "\"" + " with column name " + measure.getColName()
+              + " and column data type " + dataType + " is not a valid " + dataType + " type.");
+      row.update(null, index);
+    } else if(value.equals(nullformat)) {
+      row.update(null, index);
+    } else {
+      try {
+        output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
+        row.update(output, index);
+      } catch (NumberFormatException e) {
+        LOGGER.warn(
+            "Cant not convert : " + value + " to Numeric type value. Value considered as null.");
+        logHolder.setReason(
+            "The value " + " \"" + value + "\"" + " with column name " + measure.getColName()
+                + " and column data type " + dataType + " is not a valid " + dataType + " type.");
+        output = null;
+        row.update(output, index);
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
index 9540907..c90f1ba 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -21,9 +21,11 @@ package org.apache.carbondata.processing.newflow.converter.impl;
 import java.nio.charset.Charset;
 
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.newflow.converter.FieldConverter;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 
@@ -33,17 +35,29 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
 
   private int index;
 
-  public NonDictionaryFieldConverterImpl(DataField dataField, int index) {
+  private String nullformat;
+
+  private CarbonColumn column;
+
+  public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index) {
     this.dataType = dataField.getColumn().getDataType();
+    this.column = dataField.getColumn();
     this.index = index;
+    this.nullformat = nullformat;
   }
 
   @Override
-  public void convert(CarbonRow row) {
+  public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
     String dimensionValue = row.getString(index);
+    if (dimensionValue == null || dimensionValue.equals(nullformat)) {
+      dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
+    }
     if (dataType != DataType.STRING) {
       if (null == DataTypeUtil.normalizeIntAndLongValues(dimensionValue, dataType)) {
-        dimensionValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
+        logHolder.setReason(
+            "The value " + " \"" + dimensionValue + "\"" + " with column name " + column
+                .getColName() + " and column data type " + dataType + " is not a valid " + dataType
+                + " type.");
       }
     }
     row.update(dimensionValue.getBytes(Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)),