You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:25 UTC

[38/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
new file mode 100644
index 0000000..5c81bb9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+
+/**
+ * Metadata class for each column of table.
+ */
+public class DataField implements Serializable {
+
+  public DataField(CarbonColumn column) {
+    this.column = column;
+  }
+
+  private CarbonColumn column;
+
+  private String dateFormat;
+
+  public boolean hasDictionaryEncoding() {
+    return column.hasEncoding(Encoding.DICTIONARY);
+  }
+
+  public CarbonColumn getColumn() {
+    return column;
+  }
+
+  public String getDateFormat() {
+    return dateFormat;
+  }
+
+  public void setDateFormat(String dateFormat) {
+    this.dateFormat = dateFormat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
new file mode 100644
index 0000000..10b19b7
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadExecutor.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.exception.NoRetryException;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+
+/**
+ * It executes the data load.
+ */
+public class DataLoadExecutor {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataLoadExecutor.class.getName());
+
+  private AbstractDataLoadProcessorStep loadProcessorStep;
+
+  private boolean isClosed;
+
+  public void execute(CarbonLoadModel loadModel, String[] storeLocation,
+      CarbonIterator<Object[]>[] inputIterators) throws Exception {
+    try {
+      loadProcessorStep =
+          new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators);
+      // 1. initialize
+      loadProcessorStep.initialize();
+      LOGGER.info("Data Loading is started for table " + loadModel.getTableName());
+      // 2. execute the step
+      loadProcessorStep.execute();
+      // check and remove any bad record key from bad record entry logger static map
+      if (badRecordFound(
+          loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())) {
+        LOGGER.error("Data Load is partially success for table " + loadModel.getTableName());
+      } else {
+        LOGGER.info("Data loading is successful for table " + loadModel.getTableName());
+      }
+    } catch (CarbonDataLoadingException e) {
+      if (e instanceof BadRecordFoundException) {
+        throw new NoRetryException(e.getMessage());
+      } else {
+        throw e;
+      }
+    } catch (Exception e) {
+      LOGGER.error(e, "Data Loading failed for table " + loadModel.getTableName());
+      throw new CarbonDataLoadingException(
+          "Data Loading failed for table " + loadModel.getTableName(), e);
+    } finally {
+      removeBadRecordKey(
+          loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier());
+    }
+  }
+
+  /**
+   * This method will remove any bad record key from the map entry
+   *
+   * @param carbonTableIdentifier
+   * @return
+   */
+  private boolean badRecordFound(CarbonTableIdentifier carbonTableIdentifier) {
+    String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
+    boolean badRecordKeyFound = false;
+    if (null != BadRecordsLogger.hasBadRecord(badRecordLoggerKey)) {
+      badRecordKeyFound = true;
+    }
+    return badRecordKeyFound;
+  }
+
+  /**
+   * This method will remove the bad record key from bad record logger
+   *
+   * @param carbonTableIdentifier
+   */
+  private void removeBadRecordKey(CarbonTableIdentifier carbonTableIdentifier) {
+    String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
+    BadRecordsLogger.removeBadRecordKey(badRecordLoggerKey);
+  }
+
+  /**
+   * Method to clean all the resource
+   */
+  public void close() {
+    if (!isClosed && loadProcessorStep != null) {
+      loadProcessorStep.close();
+    }
+    isClosed = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
new file mode 100644
index 0000000..05104a2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
+import org.apache.carbondata.processing.loading.steps.CarbonRowDataWriterProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.DataConverterProcessorWithBucketingStepImpl;
+import org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.SortProcessorStepImpl;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * It builds the pipe line of steps for loading data to carbon.
+ */
+public final class DataLoadProcessBuilder {
+
+  public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String[] storeLocation,
+      CarbonIterator[] inputIterators) throws Exception {
+    CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
+    SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
+    if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
+      return buildInternalForNoSort(inputIterators, configuration);
+    } else if (configuration.getBucketingInfo() != null) {
+      return buildInternalForBucketing(inputIterators, configuration);
+    } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
+      return buildInternalForBatchSort(inputIterators, configuration);
+    } else {
+      return buildInternal(inputIterators, configuration);
+    }
+  }
+
+  private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators,
+      CarbonDataLoadConfiguration configuration) {
+    // 1. Reads the data input iterators and parses the data.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new InputProcessorStepImpl(configuration, inputIterators);
+    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+    // data types and configurations.
+    AbstractDataLoadProcessorStep converterProcessorStep =
+        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+    // 3. Sorts the data by SortColumn
+    AbstractDataLoadProcessorStep sortProcessorStep =
+        new SortProcessorStepImpl(configuration, converterProcessorStep);
+    // 4. Writes the sorted data in carbondata format.
+    return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+  }
+
+  private AbstractDataLoadProcessorStep buildInternalForNoSort(CarbonIterator[] inputIterators,
+      CarbonDataLoadConfiguration configuration) {
+    // 1. Reads the data input iterators and parses the data.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new InputProcessorStepImpl(configuration, inputIterators);
+    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+    // data types and configurations.
+    AbstractDataLoadProcessorStep converterProcessorStep =
+        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+    // 3. Writes the sorted data in carbondata format.
+    AbstractDataLoadProcessorStep writerProcessorStep =
+        new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep);
+    return writerProcessorStep;
+  }
+
+  private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators,
+      CarbonDataLoadConfiguration configuration) {
+    // 1. Reads the data input iterators and parses the data.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new InputProcessorStepImpl(configuration, inputIterators);
+    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+    // data types and configurations.
+    AbstractDataLoadProcessorStep converterProcessorStep =
+        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
+    // 3. Sorts the data by SortColumn or not
+    AbstractDataLoadProcessorStep sortProcessorStep =
+        new SortProcessorStepImpl(configuration, converterProcessorStep);
+    // 4. Writes the sorted data in carbondata format.
+    return new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
+  }
+
+  private AbstractDataLoadProcessorStep buildInternalForBucketing(CarbonIterator[] inputIterators,
+      CarbonDataLoadConfiguration configuration) throws Exception {
+    // 1. Reads the data input iterators and parses the data.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new InputProcessorStepImpl(configuration, inputIterators);
+    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
+    // data types and configurations.
+    AbstractDataLoadProcessorStep converterProcessorStep =
+        new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep);
+    // 3. Sorts the data by SortColumn or not
+    AbstractDataLoadProcessorStep sortProcessorStep =
+        new SortProcessorStepImpl(configuration, converterProcessorStep);
+    // 4. Writes the sorted data in carbondata format.
+    return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+  }
+
+  public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel,
+      String[] storeLocation) {
+    CarbonDataProcessorUtil.createLocations(storeLocation);
+
+    String databaseName = loadModel.getDatabaseName();
+    String tableName = loadModel.getTableName();
+    String tempLocationKey = CarbonDataProcessorUtil
+        .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
+            loadModel.getTaskNo(), false, false);
+    CarbonProperties.getInstance().addProperty(tempLocationKey,
+        StringUtils.join(storeLocation, File.pathSeparator));
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath());
+
+    return createConfiguration(loadModel);
+  }
+
+  public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel) {
+    CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration();
+    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
+    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+    configuration.setTableIdentifier(identifier);
+    configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
+    configuration.setHeader(loadModel.getCsvHeaderColumns());
+    configuration.setPartitionId(loadModel.getPartitionId());
+    configuration.setSegmentId(loadModel.getSegmentId());
+    configuration.setTaskNo(loadModel.getTaskNo());
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
+        new String[] { loadModel.getComplexDelimiterLevel1(),
+            loadModel.getComplexDelimiterLevel2() });
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
+        loadModel.getSerializationNullFormat().split(",")[1]);
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP,
+        loadModel.getFactTimeStamp());
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE,
+        loadModel.getBadRecordsLoggerEnable().split(",")[1]);
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION,
+        loadModel.getBadRecordsAction().split(",")[1]);
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD,
+        loadModel.getIsEmptyDataBadRecord().split(",")[1]);
+    configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
+        loadModel.getFactFilePath());
+    configuration
+        .setDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, loadModel.getSortScope());
+    configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+        loadModel.getBatchSortSizeInMb());
+    configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
+        loadModel.getGlobalSortPartitions());
+    configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+        loadModel.getBadRecordsLocation());
+    CarbonMetadata.getInstance().addCarbonTable(carbonTable);
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+    Map<String, String> dateFormatMap =
+        CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
+    List<DataField> dataFields = new ArrayList<>();
+    List<DataField> complexDataFields = new ArrayList<>();
+
+    // First add dictionary and non dictionary dimensions because these are part of mdk key.
+    // And then add complex data types and measures.
+    for (CarbonColumn column : dimensions) {
+      DataField dataField = new DataField(column);
+      dataField.setDateFormat(dateFormatMap.get(column.getColName()));
+      if (column.isComplex()) {
+        complexDataFields.add(dataField);
+      } else {
+        dataFields.add(dataField);
+      }
+    }
+    dataFields.addAll(complexDataFields);
+    for (CarbonColumn column : measures) {
+      // This dummy measure is added when no measure was present. We no need to load it.
+      if (!(column.getColName().equals("default_dummy_measure"))) {
+        dataFields.add(new DataField(column));
+      }
+    }
+    configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
+    configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
+    // configuration for one pass load: dictionary server info
+    configuration.setUseOnePass(loadModel.getUseOnePass());
+    configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
+    configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
+    configuration.setPreFetch(loadModel.isPreFetch());
+    configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
+    configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
+
+    TableSpec tableSpec = new TableSpec(dimensions, measures);
+    configuration.setTableSpec(tableSpec);
+    return configuration;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/FailureCauses.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/FailureCauses.java b/processing/src/main/java/org/apache/carbondata/processing/loading/FailureCauses.java
new file mode 100644
index 0000000..6e5f91a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/FailureCauses.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading;
+
+/**
+ * This Enum is used to determine the Reasons of Failure.
+ */
+public enum FailureCauses {
+  NONE,
+  BAD_RECORDS,
+  EXECUTOR_FAILURE,
+  STATUS_FILE_UPDATION_FAILURE,
+  MULTIPLE_INPUT_ROWS_MATCHING
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/ArrayObject.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/ArrayObject.java b/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/ArrayObject.java
new file mode 100644
index 0000000..ecd46cc
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/ArrayObject.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.complexobjects;
+
+public class ArrayObject {
+
+  private Object[] data;
+
+  public ArrayObject(Object[] data) {
+    this.data = data;
+  }
+
+  public Object[] getData() {
+    return data;
+  }
+
+  public void setData(Object[] data) {
+    this.data = data;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/StructObject.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/StructObject.java b/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/StructObject.java
new file mode 100644
index 0000000..c026a48
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/complexobjects/StructObject.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.complexobjects;
+
+public class StructObject {
+
+  private Object[] data;
+
+  public StructObject(Object[] data) {
+    this.data = data;
+  }
+
+  public Object[] getData() {
+    return data;
+  }
+
+  public void setData(Object[] data) {
+    this.data = data;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
new file mode 100644
index 0000000..260661b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/constants/DataLoadProcessorConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.constants;
+
+/**
+ * Constants used in data loading.
+ */
+public final class DataLoadProcessorConstants {
+
+  public static final String FACT_TIME_STAMP = "FACT_TIME_STAMP";
+
+  public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
+
+  public static final String SERIALIZATION_NULL_FORMAT = "SERIALIZATION_NULL_FORMAT";
+
+  public static final String BAD_RECORDS_LOGGER_ENABLE = "BAD_RECORDS_LOGGER_ENABLE";
+
+  public static final String BAD_RECORDS_LOGGER_ACTION = "BAD_RECORDS_LOGGER_ACTION";
+
+  public static final String IS_EMPTY_DATA_BAD_RECORD = "IS_EMPTY_DATA_BAD_RECORD";
+
+  public static final String FACT_FILE_PATH = "FACT_FILE_PATH";
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/BadRecordLogHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/BadRecordLogHolder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/BadRecordLogHolder.java
new file mode 100644
index 0000000..aeb4d15
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/BadRecordLogHolder.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * It is holder for reason of bad records.
+ */
+public class BadRecordLogHolder {
+
+  /**
+   * this map will hold the bad record unified message for columns
+   */
+  private Map<String, String> columnMessageMap = new HashMap<>();
+
+  private String reason;
+
+  private boolean badRecordAdded;
+
+  private boolean isLogged;
+
+  public String getReason() {
+    return reason;
+  }
+
+  public void setReason(String reason) {
+    this.reason = reason;
+    badRecordAdded = true;
+  }
+
+  public boolean isBadRecordNotAdded() {
+    return badRecordAdded;
+  }
+
+  public void clear() {
+    this.badRecordAdded = false;
+  }
+
+  public boolean isLogged() {
+    return isLogged;
+  }
+
+  public void setLogged(boolean logged) {
+    isLogged = logged;
+  }
+
+  public Map<String, String> getColumnMessageMap() {
+    return columnMessageMap;
+  }
+
+  /**
+   * this method will clear the map entries
+   */
+  public void finish() {
+    if (null != columnMessageMap) {
+      columnMessageMap.clear();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/DictionaryCardinalityFinder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/DictionaryCardinalityFinder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/DictionaryCardinalityFinder.java
new file mode 100644
index 0000000..aa84fc3
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/DictionaryCardinalityFinder.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter;
+
+/**
+ * Finds the current cardinality of dimensions.
+ */
+public interface DictionaryCardinalityFinder {
+
+  int[] getCardinality();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
new file mode 100644
index 0000000..8a3e2eb
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/FieldConverter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+/**
+ * This interface converts/transforms the column field.
+ */
+public interface FieldConverter {
+
+  /**
+   * It converts the column field and updates the data in same location/index in row.
+   * @param row
+   * @return the status whether it could be loaded or not, usually when record is added
+   * to bad records then it returns false.
+   * @throws CarbonDataLoadingException
+   */
+  void convert(CarbonRow row, BadRecordLogHolder logHolder) throws CarbonDataLoadingException;
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
new file mode 100644
index 0000000..fd3a650
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/RowConverter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+/**
+ * convert the row
+ */
+public interface RowConverter extends DictionaryCardinalityFinder {
+
+  void initialize() throws IOException;
+
+  CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException;
+
+  RowConverter createCopyForNewThread();
+
+  void finish();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/AbstractDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/AbstractDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/AbstractDictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..5349e33
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/AbstractDictionaryFieldConverterImpl.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+
+public abstract class AbstractDictionaryFieldConverterImpl implements FieldConverter {
+
+  public abstract void fillColumnCardinality(List<Integer> cardinality);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
new file mode 100644
index 0000000..5ac832d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/ComplexFieldConverterImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
+
+  private GenericDataType genericDataType;
+
+  private int index;
+
+  public ComplexFieldConverterImpl(GenericDataType genericDataType, int index) {
+    this.genericDataType = genericDataType;
+    this.index = index;
+  }
+
+  @Override
+  public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+    Object object = row.getObject(index);
+    // TODO Its temporary, needs refactor here.
+    ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
+    try {
+      genericDataType.writeByteArray(object, dataOutputStream);
+      dataOutputStream.close();
+      row.update(byteArray.toByteArray(), index);
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(object + "", e);
+    }
+  }
+
+  @Override public void fillColumnCardinality(List<Integer> cardinality) {
+    genericDataType.fillCardinality(cardinality);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..2671393
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DictionaryFieldConverterImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
+import org.apache.carbondata.processing.loading.dictionary.PreCreatedDictionary;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DictionaryFieldConverterImpl.class.getName());
+
+  private BiDictionary<Integer, Object> dictionaryGenerator;
+
+  private int index;
+
+  private CarbonDimension carbonDimension;
+
+  private String nullFormat;
+
+  private Dictionary dictionary;
+
+  private DictionaryMessage dictionaryMessage;
+
+  private boolean isEmptyBadRecord;
+
+  public DictionaryFieldConverterImpl(DataField dataField,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+      CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
+      DictionaryClient client, boolean useOnePass, String storePath,
+      Map<Object, Integer> localCache, boolean isEmptyBadRecord) throws IOException {
+    this.index = index;
+    this.carbonDimension = (CarbonDimension) dataField.getColumn();
+    this.nullFormat = nullFormat;
+    this.isEmptyBadRecord = isEmptyBadRecord;
+    DictionaryColumnUniqueIdentifier identifier =
+        new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
+            dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType(),
+            CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
+
+    // if use one pass, use DictionaryServerClientDictionary
+    if (useOnePass) {
+      if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
+        dictionary = cache.get(identifier);
+      }
+      dictionaryMessage = new DictionaryMessage();
+      dictionaryMessage.setColumnName(dataField.getColumn().getColName());
+      // for table initialization
+      dictionaryMessage.setTableUniqueId(carbonTableIdentifier.getTableId());
+      dictionaryMessage.setData("0");
+      // for generate dictionary
+      dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
+      dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
+          dictionaryMessage, localCache);
+    } else {
+      dictionary = cache.get(identifier);
+      dictionaryGenerator = new PreCreatedDictionary(dictionary);
+    }
+  }
+
+  @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder)
+      throws CarbonDataLoadingException {
+    try {
+      String parsedValue = null;
+      String dimensionValue = row.getString(index);
+      if (dimensionValue == null || dimensionValue.equals(nullFormat)) {
+        parsedValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
+      } else {
+        parsedValue = DataTypeUtil.parseValue(dimensionValue, carbonDimension);
+      }
+      if (null == parsedValue) {
+        if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
+          String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
+          if (null == message) {
+            message = CarbonDataProcessorUtil.prepareFailureReason(
+                carbonDimension.getColName(), carbonDimension.getDataType());
+            logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
+          } logHolder.setReason(message);
+        }
+        row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY, index);
+      } else {
+        row.update(dictionaryGenerator.getOrGenerateKey(parsedValue), index);
+      }
+    } catch (DictionaryGenerationException e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  @Override
+  public void fillColumnCardinality(List<Integer> cardinality) {
+    cardinality.add(dictionaryGenerator.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..24c2f00
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/DirectDictionaryFieldConverterImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
+
+  private DirectDictionaryGenerator directDictionaryGenerator;
+
+  private int index;
+
+  private String nullFormat;
+
+  private CarbonColumn column;
+  private boolean isEmptyBadRecord;
+
+  public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index,
+      boolean isEmptyBadRecord) {
+    this.nullFormat = nullFormat;
+    this.column = dataField.getColumn();
+    if (dataField.getDateFormat() != null && !dataField.getDateFormat().isEmpty()) {
+      this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(dataField.getColumn().getDataType(),
+              dataField.getDateFormat());
+
+    } else {
+      this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
+          .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
+    }
+    this.index = index;
+    this.isEmptyBadRecord = isEmptyBadRecord;
+  }
+
+  @Override
+  public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+    String value = row.getString(index);
+    if (value == null) {
+      logHolder.setReason(
+          CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
+      row.update(1, index);
+    } else if (value.equals(nullFormat)) {
+      row.update(1, index);
+    } else {
+      int key = directDictionaryGenerator.generateDirectSurrogateKey(value);
+      if (key == 1) {
+        if ((value.length() > 0) || (value.length() == 0 && isEmptyBadRecord)) {
+          String message = logHolder.getColumnMessageMap().get(column.getColName());
+          if (null == message) {
+            message = CarbonDataProcessorUtil.prepareFailureReason(
+                column.getColName(), column.getDataType());
+            logHolder.getColumnMessageMap().put(column.getColName(), message);
+          }
+          logHolder.setReason(message);
+        }
+      }
+      row.update(key, index);
+    }
+  }
+
+  @Override
+  public void fillColumnCardinality(List<Integer> cardinality) {
+    cardinality.add(Integer.MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
new file mode 100644
index 0000000..2efbe26
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.processing.datatypes.ArrayDataType;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
+import org.apache.carbondata.processing.datatypes.StructDataType;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+
+public class FieldEncoderFactory {
+
+  private static FieldEncoderFactory instance;
+
+  private FieldEncoderFactory() {
+
+  }
+
+  public static FieldEncoderFactory getInstance() {
+    if (instance == null) {
+      instance = new FieldEncoderFactory();
+    }
+    return instance;
+  }
+
+  /**
+   * Creates the FieldConverter for all dimensions, for measures return null.
+   *
+   * @param dataField             column schema
+   * @param cache                 dicionary cache.
+   * @param carbonTableIdentifier table identifier
+   * @param index                 index of column in the row.
+   * @param isEmptyBadRecord
+   * @return
+   */
+  public FieldConverter createFieldEncoder(DataField dataField,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+      CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
+      DictionaryClient client, Boolean useOnePass, String storePath,
+      Map<Object, Integer> localCache, boolean isEmptyBadRecord)
+      throws IOException {
+    // Converters are only needed for dimensions and measures it return null.
+    if (dataField.getColumn().isDimension()) {
+      if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+          !dataField.getColumn().isComplex()) {
+        return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
+            isEmptyBadRecord);
+      } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
+          !dataField.getColumn().isComplex()) {
+        return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
+            index, client, useOnePass, storePath, localCache, isEmptyBadRecord);
+      } else if (dataField.getColumn().isComplex()) {
+        return new ComplexFieldConverterImpl(
+            createComplexType(dataField, cache, carbonTableIdentifier,
+                    client, useOnePass, storePath, localCache), index);
+      } else {
+        return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
+      }
+    } else {
+      return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
+    }
+  }
+
+  /**
+   * Create parser for the carbon column.
+   */
+  private static GenericDataType createComplexType(DataField dataField,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+      CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
+      String storePath, Map<Object, Integer> localCache) {
+    return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
+        carbonTableIdentifier, client, useOnePass, storePath, localCache);
+  }
+
+  /**
+   * This method may be called recursively if the carbon column is complex type.
+   *
+   * @return GenericDataType
+   */
+  private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
+      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+      CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
+      String storePath, Map<Object, Integer> localCache) {
+    switch (carbonColumn.getDataType()) {
+      case ARRAY:
+        List<CarbonDimension> listOfChildDimensions =
+            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+        // Create array parser with complex delimiter
+        ArrayDataType arrayDataType =
+            new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
+        for (CarbonDimension dimension : listOfChildDimensions) {
+          arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
+              carbonTableIdentifier, client, useOnePass, storePath, localCache));
+        }
+        return arrayDataType;
+      case STRUCT:
+        List<CarbonDimension> dimensions =
+            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
+        // Create struct parser with complex delimiter
+        StructDataType structDataType =
+            new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
+        for (CarbonDimension dimension : dimensions) {
+          structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
+              carbonTableIdentifier, client, useOnePass, storePath, localCache));
+        }
+        return structDataType;
+      case MAP:
+        throw new UnsupportedOperationException("Complex type Map is not supported yet");
+      default:
+        return new PrimitiveDataType(carbonColumn.getColName(), parentName,
+            carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
+            carbonTableIdentifier, client, useOnePass, storePath, localCache);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
new file mode 100644
index 0000000..06f7589
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * Converter for measure
+ */
+public class MeasureFieldConverterImpl implements FieldConverter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(MeasureFieldConverterImpl.class.getName());
+
+  private int index;
+
+  private DataType dataType;
+
+  private CarbonMeasure measure;
+
+  private String nullformat;
+
+  private boolean isEmptyBadRecord;
+
+  public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
+      boolean isEmptyBadRecord) {
+    this.dataType = dataField.getColumn().getDataType();
+    this.measure = (CarbonMeasure) dataField.getColumn();
+    this.nullformat = nullformat;
+    this.index = index;
+    this.isEmptyBadRecord = isEmptyBadRecord;
+  }
+
+  @Override
+  public void convert(CarbonRow row, BadRecordLogHolder logHolder)
+      throws CarbonDataLoadingException {
+    String value = row.getString(index);
+    Object output;
+    boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(value);
+    if (value == null || isNull) {
+      String message = logHolder.getColumnMessageMap().get(measure.getColName());
+      if (null == message) {
+        message = CarbonDataProcessorUtil
+            .prepareFailureReason(measure.getColName(), measure.getDataType());
+        logHolder.getColumnMessageMap().put(measure.getColName(), message);
+      }
+      row.update(null, index);
+    } else if (value.length() == 0) {
+      if (isEmptyBadRecord) {
+        String message = logHolder.getColumnMessageMap().get(measure.getColName());
+        if (null == message) {
+          message = CarbonDataProcessorUtil
+              .prepareFailureReason(measure.getColName(), measure.getDataType());
+          logHolder.getColumnMessageMap().put(measure.getColName(), message);
+        }
+        logHolder.setReason(message);
+      }
+      row.update(null, index);
+    } else if (value.equals(nullformat)) {
+      row.update(null, index);
+    } else {
+      try {
+        output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
+        row.update(output, index);
+      } catch (NumberFormatException e) {
+        LOGGER.warn(
+            "Cant not convert value to Numeric type value. Value considered as null.");
+        logHolder.setReason(
+            CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
+        output = null;
+        row.update(output, index);
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
new file mode 100644
index 0000000..8170680
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class NonDictionaryFieldConverterImpl implements FieldConverter {
+
+  private DataType dataType;
+
+  private int index;
+
+  private String nullformat;
+
+  private CarbonColumn column;
+
+  private boolean isEmptyBadRecord;
+
+  private DataField dataField;
+
+  public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index,
+      boolean isEmptyBadRecord) {
+    this.dataField = dataField;
+    this.dataType = dataField.getColumn().getDataType();
+    this.column = dataField.getColumn();
+    this.index = index;
+    this.nullformat = nullformat;
+    this.isEmptyBadRecord = isEmptyBadRecord;
+  }
+
+  @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
+    String dimensionValue = row.getString(index);
+    if (null == dimensionValue && column.getDataType() != DataType.STRING) {
+      logHolder.setReason(
+          CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
+      updateWithNullValue(row);
+    } else if (dimensionValue == null || dimensionValue.equals(nullformat)) {
+      updateWithNullValue(row);
+    } else {
+      try {
+        row.update(DataTypeUtil
+            .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType,
+                dataField.getDateFormat()), index);
+      } catch (Throwable ex) {
+        if (dimensionValue.length() > 0 || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
+          String message = logHolder.getColumnMessageMap().get(column.getColName());
+          if (null == message) {
+            message = CarbonDataProcessorUtil
+                .prepareFailureReason(column.getColName(), column.getDataType());
+            logHolder.getColumnMessageMap().put(column.getColName(), message);
+          }
+          logHolder.setReason(message);
+          updateWithNullValue(row);
+        } else {
+          updateWithNullValue(row);
+        }
+      }
+    }
+  }
+
+  private void updateWithNullValue(CarbonRow row) {
+    if (dataType == DataType.STRING) {
+      row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
+    } else {
+      row.update(CarbonCommonConstants.EMPTY_BYTE_ARRAY, index);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
new file mode 100644
index 0000000..a4351ae
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/RowConverterImpl.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.converter.impl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.loading.converter.FieldConverter;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+/**
+ * It converts the complete row if necessary, dictionary columns are encoded with dictionary values
+ * and nondictionary values are converted to binary.
+ */
+public class RowConverterImpl implements RowConverter {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RowConverterImpl.class.getName());
+
+  private CarbonDataLoadConfiguration configuration;
+
+  private DataField[] fields;
+
+  private FieldConverter[] fieldConverters;
+
+  private BadRecordsLogger badRecordLogger;
+
+  private BadRecordLogHolder logHolder;
+
+  private List<DictionaryClient> dictClients = new ArrayList<>();
+
+  private ExecutorService executorService;
+
+  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
+
+  private Map<Object, Integer>[] localCaches;
+
+  public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration,
+      BadRecordsLogger badRecordLogger) {
+    this.fields = fields;
+    this.configuration = configuration;
+    this.badRecordLogger = badRecordLogger;
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    CacheProvider cacheProvider = CacheProvider.getInstance();
+    cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
+        configuration.getTableIdentifier().getStorePath());
+    String nullFormat =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+            .toString();
+    boolean isEmptyBadRecord = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+            .toString());
+    List<FieldConverter> fieldConverterList = new ArrayList<>();
+    localCaches = new Map[fields.length];
+    long lruCacheStartTime = System.currentTimeMillis();
+    DictionaryClient client = createDictionaryClient();
+    dictClients.add(client);
+
+    for (int i = 0; i < fields.length; i++) {
+      localCaches[i] = new ConcurrentHashMap<>();
+      FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
+          .createFieldEncoder(fields[i], cache,
+              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
+              configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
+              localCaches[i], isEmptyBadRecord);
+      fieldConverterList.add(fieldConverter);
+    }
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
+    fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
+    logHolder = new BadRecordLogHolder();
+  }
+
+  private DictionaryClient createDictionaryClient() {
+    // for one pass load, start the dictionary client
+    if (configuration.getUseOnePass()) {
+      if (executorService == null) {
+        executorService = Executors.newCachedThreadPool();
+      }
+      Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>() {
+        @Override
+        public DictionaryClient call() throws Exception {
+          Thread.currentThread().setName("Dictionary client");
+          DictionaryClient dictionaryClient = new DictionaryClient();
+          dictionaryClient.startClient(configuration.getDictionaryServerHost(),
+              configuration.getDictionaryServerPort());
+          return dictionaryClient;
+        }
+      });
+
+      try {
+        // wait for client initialization finished, or will raise null pointer exception
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOGGER.error(e);
+        throw new RuntimeException(e);
+      }
+
+      try {
+        return result.get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
+    //TODO: only copy if it is bad record
+    CarbonRow copy = row.getCopy();
+    logHolder.setLogged(false);
+    logHolder.clear();
+    for (int i = 0; i < fieldConverters.length; i++) {
+      fieldConverters[i].convert(row, logHolder);
+      if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
+        badRecordLogger.addBadRecordsToBuilder(copy.getData(), logHolder.getReason());
+        if (badRecordLogger.isDataLoadFail()) {
+          String error = "Data load failed due to bad record: " + logHolder.getReason() +
+              "Please enable bad record logger to know the detail reason.";
+          throw new BadRecordFoundException(error);
+        }
+        logHolder.clear();
+        logHolder.setLogged(true);
+        if (badRecordLogger.isBadRecordConvertNullDisable()) {
+          return null;
+        }
+      }
+    }
+    return row;
+  }
+
+  @Override
+  public void finish() {
+    // close dictionary client when finish write
+    if (configuration.getUseOnePass()) {
+      for (DictionaryClient client : dictClients) {
+        if (client != null) {
+          client.shutDown();
+        }
+      }
+      if (null != logHolder) {
+        logHolder.finish();
+      }
+      if (executorService != null) {
+        executorService.shutdownNow();
+        executorService = null;
+      }
+    }
+  }
+
+  @Override
+  public RowConverter createCopyForNewThread() {
+    RowConverterImpl converter =
+        new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger);
+    List<FieldConverter> fieldConverterList = new ArrayList<>();
+    DictionaryClient client = createDictionaryClient();
+    dictClients.add(client);
+    String nullFormat =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+            .toString();
+    boolean isEmptyBadRecord = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
+            .toString());
+    for (int i = 0; i < fields.length; i++) {
+      FieldConverter fieldConverter = null;
+      try {
+        fieldConverter = FieldEncoderFactory.getInstance().createFieldEncoder(fields[i], cache,
+            configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
+            configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
+            localCaches[i], isEmptyBadRecord);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      fieldConverterList.add(fieldConverter);
+    }
+    converter.fieldConverters =
+        fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
+    converter.logHolder = new BadRecordLogHolder();
+    return converter;
+  }
+
+  @Override public int[] getCardinality() {
+    List<Integer> dimCardinality = new ArrayList<>();
+    if (fieldConverters != null) {
+      for (int i = 0; i < fieldConverters.length; i++) {
+        if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
+          ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
+              .fillColumnCardinality(dimCardinality);
+        }
+      }
+    }
+    int[] cardinality = new int[dimCardinality.size()];
+    for (int i = 0; i < dimCardinality.size(); i++) {
+      cardinality[i] = dimCardinality.get(i);
+    }
+    return cardinality;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BlockDetails.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BlockDetails.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BlockDetails.java
new file mode 100644
index 0000000..d0c8a73
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BlockDetails.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.csvinput;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * blocks info
+ */
+public class BlockDetails extends FileSplit implements Serializable {
+
+  /**
+   * serialization version
+   */
+  private static final long serialVersionUID = 2293906691860002339L;
+  //block offset
+  private long blockOffset;
+  //block length
+  private long blockLength;
+  //file path which block belong to
+  private String filePath;
+  // locations where this block exists
+  private String[] locations;
+
+  public BlockDetails(Path filePath, long blockOffset, long blockLength, String[] locations) {
+    super(filePath, blockOffset, blockLength, locations);
+    this.filePath = filePath.toString();
+    this.blockOffset = blockOffset;
+    this.blockLength = blockLength;
+    this.locations = locations;
+  }
+
+  public long getBlockOffset() {
+    return blockOffset;
+  }
+
+  public long getBlockLength() {
+    return blockLength;
+  }
+
+  public String getFilePath() {
+    return FileFactory.getUpdatedFilePath(filePath);
+  }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public String[] getLocations() {
+    return locations;
+  }
+
+  /** The file containing this split's data. */
+  @Override
+  public Path getPath() { return new Path(filePath); }
+
+  /** The position of the first byte in the file to process. */
+  @Override
+  public long getStart() { return blockOffset; }
+
+  /** The number of bytes in the file to process. */
+  @Override
+  public long getLength() { return blockLength; }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BoundedInputStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BoundedInputStream.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BoundedInputStream.java
new file mode 100644
index 0000000..6fe9107
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/BoundedInputStream.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.csvinput;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Customarized reader class to read data from file
+ * untill the upper threshold reached.
+ */
+public class BoundedInputStream extends InputStream {
+
+  /**
+   * byte value of the new line character
+   */
+  private static final byte END_OF_LINE_BYTE_VALUE = '\n';
+
+  /**
+   * number of extra character to read
+   */
+  private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
+
+  /**
+   * number of bytes remaining
+   */
+  private long remaining;
+  /**
+   * to check whether end of line is found
+   */
+  private boolean endOfLineFound = false;
+
+  private DataInputStream in;
+
+  public BoundedInputStream(DataInputStream in, long limit) {
+    this.in = in;
+    this.remaining = limit;
+  }
+
+  /**
+   * Below method will be used to read the data from file
+   *
+   * @throws IOException
+   *           problem while reading
+   */
+  @Override
+  public int read() throws IOException {
+    if (this.remaining == 0) {
+      return -1;
+    } else {
+      int var1 = this.in.read();
+      if (var1 >= 0) {
+        --this.remaining;
+      }
+
+      return var1;
+    }
+  }
+
+  /**
+   * Below method will be used to read the data from file. If limit reaches in
+   * that case it will read until new line character is reached
+   *
+   * @param buffer
+   *          buffer in which data will be read
+   * @param offset
+   *          from position to buffer will be filled
+   * @param length
+   *          number of character to be read
+   * @throws IOException
+   *           problem while reading
+   */
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    if (this.remaining == 0) {
+      return -1;
+    } else {
+      if (this.remaining < length) {
+        length = (int) this.remaining;
+      }
+
+      length = this.in.read(buffer, offset, length);
+      if (length >= 0) {
+        this.remaining -= length;
+        if (this.remaining == 0 && !endOfLineFound) {
+          endOfLineFound = true;
+          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+        } else if (endOfLineFound) {
+          int end = offset + length;
+          for (int i = offset; i < end; i++) {
+            if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
+              this.remaining = 0;
+              return (i - offset) + 1;
+            }
+          }
+          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+        }
+      }
+      return length;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (in != null) {
+      in.close();
+    }
+  }
+
+  public long getRemaining() {
+    return  this.remaining;
+  }
+
+}