You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:19 UTC

[32/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
deleted file mode 100644
index 50bfaff..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.BucketingInfo;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.processing.newflow.converter.DictionaryCardinalityFinder;
-
-public class CarbonDataLoadConfiguration {
-
-  private DataField[] dataFields;
-
-  private AbsoluteTableIdentifier tableIdentifier;
-
-  private String[] header;
-
-  private String partitionId;
-
-  private String segmentId;
-
-  private String taskNo;
-
-  private BucketingInfo bucketingInfo;
-
-  private Map<String, Object> dataLoadProperties = new HashMap<>();
-
-  /**
-   *  Use one pass to generate dictionary
-   */
-  private boolean useOnePass;
-
-  /**
-   * dictionary server host
-   */
-  private String dictionaryServerHost;
-
-  /**
-   * dictionary sever port
-   */
-  private int dictionaryServerPort;
-
-  private boolean preFetch;
-
-  private int dimensionCount;
-
-  private int measureCount;
-
-  private int noDictionaryCount;
-
-  private int complexColumnCount;
-
-  /**
-   * schema updated time stamp to be used for restructure scenarios
-   */
-  private long schemaUpdatedTimeStamp;
-
-  private DictionaryCardinalityFinder cardinalityFinder;
-
-  private int numberOfSortColumns;
-
-  private int numberOfNoDictSortColumns;
-
-  // contains metadata used in write step of loading process
-  private TableSpec tableSpec;
-
-  public CarbonDataLoadConfiguration() {
-  }
-
-  public void setDataFields(DataField[] dataFields) {
-    this.dataFields = dataFields;
-
-    // set counts for each column category
-    for (DataField dataField : dataFields) {
-      CarbonColumn column = dataField.getColumn();
-      if (column.isDimension()) {
-        dimensionCount++;
-        if (!dataField.hasDictionaryEncoding()) {
-          noDictionaryCount++;
-        }
-      }
-      if (column.isComplex()) {
-        complexColumnCount++;
-      }
-      if (column.isMeasure()) {
-        measureCount++;
-      }
-    }
-  }
-
-  public DataField[] getDataFields() {
-    return dataFields;
-  }
-
-  public int getDimensionCount() {
-    return dimensionCount;
-  }
-
-  public int getNoDictionaryCount() {
-    return noDictionaryCount;
-  }
-
-  public int getComplexColumnCount() {
-    return complexColumnCount;
-  }
-
-  public int getMeasureCount() {
-    return measureCount;
-  }
-
-  public void setNumberOfSortColumns(int numberOfSortColumns) {
-    this.numberOfSortColumns = numberOfSortColumns;
-  }
-
-  public int getNumberOfSortColumns() {
-    return this.numberOfSortColumns;
-  }
-
-  public boolean isSortTable() {
-    return this.numberOfSortColumns > 0;
-  }
-
-  public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
-    this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
-  }
-
-  public int getNumberOfNoDictSortColumns() {
-    return this.numberOfNoDictSortColumns;
-  }
-
-  public String[] getHeader() {
-    return header;
-  }
-
-  public void setHeader(String[] header) {
-    this.header = header;
-  }
-
-  public AbsoluteTableIdentifier getTableIdentifier() {
-    return tableIdentifier;
-  }
-
-  public void setTableIdentifier(AbsoluteTableIdentifier tableIdentifier) {
-    this.tableIdentifier = tableIdentifier;
-  }
-
-  public String getPartitionId() {
-    return partitionId;
-  }
-
-  public void setPartitionId(String partitionId) {
-    this.partitionId = partitionId;
-  }
-
-  public String getSegmentId() {
-    return segmentId;
-  }
-
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
-  }
-
-  public String getTaskNo() {
-    return taskNo;
-  }
-
-  public void setTaskNo(String taskNo) {
-    this.taskNo = taskNo;
-  }
-
-  public void setDataLoadProperty(String key, Object value) {
-    dataLoadProperties.put(key, value);
-  }
-
-  public Object getDataLoadProperty(String key) {
-    return dataLoadProperties.get(key);
-  }
-
-  public BucketingInfo getBucketingInfo() {
-    return bucketingInfo;
-  }
-
-  public void setBucketingInfo(BucketingInfo bucketingInfo) {
-    this.bucketingInfo = bucketingInfo;
-  }
-
-  public boolean getUseOnePass() {
-    return useOnePass;
-  }
-
-  public void setUseOnePass(boolean useOnePass) {
-    this.useOnePass = useOnePass;
-  }
-
-  public String getDictionaryServerHost() {
-    return dictionaryServerHost;
-  }
-
-  public void setDictionaryServerHost(String dictionaryServerHost) {
-    this.dictionaryServerHost = dictionaryServerHost;
-  }
-
-  public int getDictionaryServerPort() {
-    return dictionaryServerPort;
-  }
-
-  public void setDictionaryServerPort(int dictionaryServerPort) {
-    this.dictionaryServerPort = dictionaryServerPort;
-  }
-
-  public boolean isPreFetch() {
-    return preFetch;
-  }
-
-  public void setPreFetch(boolean preFetch) {
-    this.preFetch = preFetch;
-  }
-
-  public long getSchemaUpdatedTimeStamp() {
-    return schemaUpdatedTimeStamp;
-  }
-
-  public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
-    this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
-  }
-
-  public DictionaryCardinalityFinder getCardinalityFinder() {
-    return cardinalityFinder;
-  }
-
-  public void setCardinalityFinder(DictionaryCardinalityFinder cardinalityFinder) {
-    this.cardinalityFinder = cardinalityFinder;
-  }
-
-  public DataType[] getMeasureDataType() {
-    List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
-    int measureCount = 0;
-    for (int i = 0; i < dataFields.length; i++) {
-      if (!dataFields[i].getColumn().isDimension()) {
-        measureIndexes.add(i);
-        measureCount++;
-      }
-    }
-
-    DataType[] type = new DataType[measureCount];
-    for (int i = 0; i < type.length; i++) {
-      type[i] = dataFields[measureIndexes.get(i)].getColumn().getDataType();
-    }
-    return type;
-  }
-
-  public int[] calcDimensionLengths() {
-    int[] dimLensWithComplex = getCardinalityFinder().getCardinality();
-    if (!isSortTable()) {
-      for (int i = 0; i < dimLensWithComplex.length; i++) {
-        if (dimLensWithComplex[i] != 0) {
-          dimLensWithComplex[i] = Integer.MAX_VALUE;
-        }
-      }
-    }
-    List<Integer> dimsLenList = new ArrayList<Integer>();
-    for (int eachDimLen : dimLensWithComplex) {
-      if (eachDimLen != 0) dimsLenList.add(eachDimLen);
-    }
-    int[] dimLens = new int[dimsLenList.size()];
-    for (int i = 0; i < dimsLenList.size(); i++) {
-      dimLens[i] = dimsLenList.get(i);
-    }
-    return dimLens;
-  }
-
-  public KeyGenerator[] createKeyGeneratorForComplexDimension() {
-    int[] dimLens = calcDimensionLengths();
-    KeyGenerator[] complexKeyGenerators = new KeyGenerator[dimLens.length];
-    for (int i = 0; i < dimLens.length; i++) {
-      complexKeyGenerators[i] =
-          KeyGeneratorFactory.getKeyGenerator(new int[] { dimLens[i] });
-    }
-    return complexKeyGenerators;
-  }
-
-  public TableSpec getTableSpec() {
-    return tableSpec;
-  }
-
-  public void setTableSpec(TableSpec tableSpec) {
-    this.tableSpec = tableSpec;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
deleted file mode 100644
index 892055b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-
-/**
- * Metadata class for each column of table.
- */
-public class DataField implements Serializable {
-
-  public DataField(CarbonColumn column) {
-    this.column = column;
-  }
-
-  private CarbonColumn column;
-
-  private String dateFormat;
-
-  public boolean hasDictionaryEncoding() {
-    return column.hasEncoding(Encoding.DICTIONARY);
-  }
-
-  public CarbonColumn getColumn() {
-    return column;
-  }
-
-  public String getDateFormat() {
-    return dateFormat;
-  }
-
-  public void setDateFormat(String dateFormat) {
-    this.dateFormat = dateFormat;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
deleted file mode 100644
index 36a89b5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.exception.NoRetryException;
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
-
-/**
- * It executes the data load.
- */
-public class DataLoadExecutor {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataLoadExecutor.class.getName());
-
-  private AbstractDataLoadProcessorStep loadProcessorStep;
-
-  private boolean isClosed;
-
-  public void execute(CarbonLoadModel loadModel, String[] storeLocation,
-      CarbonIterator<Object[]>[] inputIterators) throws Exception {
-    try {
-      loadProcessorStep =
-          new DataLoadProcessBuilder().build(loadModel, storeLocation, inputIterators);
-      // 1. initialize
-      loadProcessorStep.initialize();
-      LOGGER.info("Data Loading is started for table " + loadModel.getTableName());
-      // 2. execute the step
-      loadProcessorStep.execute();
-      // check and remove any bad record key from bad record entry logger static map
-      if (badRecordFound(
-          loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier())) {
-        LOGGER.error("Data Load is partially success for table " + loadModel.getTableName());
-      } else {
-        LOGGER.info("Data loading is successful for table " + loadModel.getTableName());
-      }
-    } catch (CarbonDataLoadingException e) {
-      if (e instanceof BadRecordFoundException) {
-        throw new NoRetryException(e.getMessage());
-      } else {
-        throw e;
-      }
-    } catch (Exception e) {
-      LOGGER.error(e, "Data Loading failed for table " + loadModel.getTableName());
-      throw new CarbonDataLoadingException(
-          "Data Loading failed for table " + loadModel.getTableName(), e);
-    } finally {
-      removeBadRecordKey(
-          loadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier());
-    }
-  }
-
-  /**
-   * This method will remove any bad record key from the map entry
-   *
-   * @param carbonTableIdentifier
-   * @return
-   */
-  private boolean badRecordFound(CarbonTableIdentifier carbonTableIdentifier) {
-    String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
-    boolean badRecordKeyFound = false;
-    if (null != BadRecordsLogger.hasBadRecord(badRecordLoggerKey)) {
-      badRecordKeyFound = true;
-    }
-    return badRecordKeyFound;
-  }
-
-  /**
-   * This method will remove the bad record key from bad record logger
-   *
-   * @param carbonTableIdentifier
-   */
-  private void removeBadRecordKey(CarbonTableIdentifier carbonTableIdentifier) {
-    String badRecordLoggerKey = carbonTableIdentifier.getBadRecordLoggerKey();
-    BadRecordsLogger.removeBadRecordKey(badRecordLoggerKey);
-  }
-
-  /**
-   * Method to clean all the resource
-   */
-  public void close() {
-    if (!isClosed && loadProcessorStep != null) {
-      loadProcessorStep.close();
-    }
-    isClosed = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
deleted file mode 100644
index ccb25e6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
-import org.apache.carbondata.processing.newflow.steps.CarbonRowDataWriterProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl;
-import org.apache.carbondata.processing.newflow.steps.DataWriterBatchProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
-import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.apache.commons.lang3.StringUtils;
-
-/**
- * It builds the pipe line of steps for loading data to carbon.
- */
-public final class DataLoadProcessBuilder {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataLoadProcessBuilder.class.getName());
-
-  public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String[] storeLocation,
-      CarbonIterator[] inputIterators) throws Exception {
-    CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
-    SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
-    if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
-      return buildInternalForNoSort(inputIterators, configuration);
-    } else if (configuration.getBucketingInfo() != null) {
-      return buildInternalForBucketing(inputIterators, configuration);
-    } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
-      return buildInternalForBatchSort(inputIterators, configuration);
-    } else {
-      return buildInternal(inputIterators, configuration);
-    }
-  }
-
-  private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators,
-      CarbonDataLoadConfiguration configuration) {
-    // 1. Reads the data input iterators and parses the data.
-    AbstractDataLoadProcessorStep inputProcessorStep =
-        new InputProcessorStepImpl(configuration, inputIterators);
-    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
-    // data types and configurations.
-    AbstractDataLoadProcessorStep converterProcessorStep =
-        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
-    // 3. Sorts the data by SortColumn
-    AbstractDataLoadProcessorStep sortProcessorStep =
-        new SortProcessorStepImpl(configuration, converterProcessorStep);
-    // 4. Writes the sorted data in carbondata format.
-    return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
-  }
-
-  private AbstractDataLoadProcessorStep buildInternalForNoSort(CarbonIterator[] inputIterators,
-      CarbonDataLoadConfiguration configuration) {
-    // 1. Reads the data input iterators and parses the data.
-    AbstractDataLoadProcessorStep inputProcessorStep =
-        new InputProcessorStepImpl(configuration, inputIterators);
-    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
-    // data types and configurations.
-    AbstractDataLoadProcessorStep converterProcessorStep =
-        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
-    // 3. Writes the sorted data in carbondata format.
-    AbstractDataLoadProcessorStep writerProcessorStep =
-        new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep);
-    return writerProcessorStep;
-  }
-
-  private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators,
-      CarbonDataLoadConfiguration configuration) {
-    // 1. Reads the data input iterators and parses the data.
-    AbstractDataLoadProcessorStep inputProcessorStep =
-        new InputProcessorStepImpl(configuration, inputIterators);
-    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
-    // data types and configurations.
-    AbstractDataLoadProcessorStep converterProcessorStep =
-        new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
-    // 3. Sorts the data by SortColumn or not
-    AbstractDataLoadProcessorStep sortProcessorStep =
-        new SortProcessorStepImpl(configuration, converterProcessorStep);
-    // 4. Writes the sorted data in carbondata format.
-    return new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
-  }
-
-  private AbstractDataLoadProcessorStep buildInternalForBucketing(CarbonIterator[] inputIterators,
-      CarbonDataLoadConfiguration configuration) throws Exception {
-    // 1. Reads the data input iterators and parses the data.
-    AbstractDataLoadProcessorStep inputProcessorStep =
-        new InputProcessorStepImpl(configuration, inputIterators);
-    // 2. Converts the data like dictionary or non dictionary or complex objects depends on
-    // data types and configurations.
-    AbstractDataLoadProcessorStep converterProcessorStep =
-        new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep);
-    // 3. Sorts the data by SortColumn or not
-    AbstractDataLoadProcessorStep sortProcessorStep =
-        new SortProcessorStepImpl(configuration, converterProcessorStep);
-    // 4. Writes the sorted data in carbondata format.
-    return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
-  }
-
-  public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel,
-      String[] storeLocation) {
-    CarbonDataProcessorUtil.createLocations(storeLocation);
-
-    String databaseName = loadModel.getDatabaseName();
-    String tableName = loadModel.getTableName();
-    String tempLocationKey = CarbonDataProcessorUtil
-        .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
-            loadModel.getTaskNo(), false, false);
-    CarbonProperties.getInstance().addProperty(tempLocationKey,
-        StringUtils.join(storeLocation, File.pathSeparator));
-    CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath());
-
-    return createConfiguration(loadModel);
-  }
-
-  public static CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel) {
-    CarbonDataLoadConfiguration configuration = new CarbonDataLoadConfiguration();
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
-    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
-    configuration.setTableIdentifier(identifier);
-    configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
-    configuration.setHeader(loadModel.getCsvHeaderColumns());
-    configuration.setPartitionId(loadModel.getPartitionId());
-    configuration.setSegmentId(loadModel.getSegmentId());
-    configuration.setTaskNo(loadModel.getTaskNo());
-    configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
-        new String[] { loadModel.getComplexDelimiterLevel1(),
-            loadModel.getComplexDelimiterLevel2() });
-    configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
-        loadModel.getSerializationNullFormat().split(",")[1]);
-    configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP,
-        loadModel.getFactTimeStamp());
-    configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE,
-        loadModel.getBadRecordsLoggerEnable().split(",")[1]);
-    configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION,
-        loadModel.getBadRecordsAction().split(",")[1]);
-    configuration.setDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD,
-        loadModel.getIsEmptyDataBadRecord().split(",")[1]);
-    configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
-        loadModel.getFactFilePath());
-    configuration
-        .setDataLoadProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, loadModel.getSortScope());
-    configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-        loadModel.getBatchSortSizeInMb());
-    configuration.setDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
-        loadModel.getGlobalSortPartitions());
-    configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-        loadModel.getBadRecordsLocation());
-    CarbonMetadata.getInstance().addCarbonTable(carbonTable);
-    List<CarbonDimension> dimensions =
-        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
-    List<CarbonMeasure> measures =
-        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
-    Map<String, String> dateFormatMap =
-        CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
-    List<DataField> dataFields = new ArrayList<>();
-    List<DataField> complexDataFields = new ArrayList<>();
-
-    // First add dictionary and non dictionary dimensions because these are part of mdk key.
-    // And then add complex data types and measures.
-    for (CarbonColumn column : dimensions) {
-      DataField dataField = new DataField(column);
-      dataField.setDateFormat(dateFormatMap.get(column.getColName()));
-      if (column.isComplex()) {
-        complexDataFields.add(dataField);
-      } else {
-        dataFields.add(dataField);
-      }
-    }
-    dataFields.addAll(complexDataFields);
-    for (CarbonColumn column : measures) {
-      // This dummy measure is added when no measure was present. We no need to load it.
-      if (!(column.getColName().equals("default_dummy_measure"))) {
-        dataFields.add(new DataField(column));
-      }
-    }
-    configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
-    configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
-    // configuration for one pass load: dictionary server info
-    configuration.setUseOnePass(loadModel.getUseOnePass());
-    configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
-    configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
-    configuration.setPreFetch(loadModel.isPreFetch());
-    configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
-    configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
-
-    TableSpec tableSpec = new TableSpec(dimensions, measures);
-    configuration.setTableSpec(tableSpec);
-    return configuration;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/ArrayObject.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/ArrayObject.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/ArrayObject.java
deleted file mode 100644
index 196afdb..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/ArrayObject.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.complexobjects;
-
-public class ArrayObject {
-
-  private Object[] data;
-
-  public ArrayObject(Object[] data) {
-    this.data = data;
-  }
-
-  public Object[] getData() {
-    return data;
-  }
-
-  public void setData(Object[] data) {
-    this.data = data;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java
deleted file mode 100644
index d1e0f9b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/complexobjects/StructObject.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.complexobjects;
-
-public class StructObject {
-
-  private Object[] data;
-
-  public StructObject(Object[] data) {
-    this.data = data;
-  }
-
-  public Object[] getData() {
-    return data;
-  }
-
-  public void setData(Object[] data) {
-    this.data = data;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
deleted file mode 100644
index 11570b4..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.constants;
-
-/**
- * Constants used in data loading.
- */
-public final class DataLoadProcessorConstants {
-
-  public static final String FACT_TIME_STAMP = "FACT_TIME_STAMP";
-
-  public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS";
-
-  public static final String SERIALIZATION_NULL_FORMAT = "SERIALIZATION_NULL_FORMAT";
-
-  public static final String BAD_RECORDS_LOGGER_ENABLE = "BAD_RECORDS_LOGGER_ENABLE";
-
-  public static final String BAD_RECORDS_LOGGER_ACTION = "BAD_RECORDS_LOGGER_ACTION";
-
-  public static final String IS_EMPTY_DATA_BAD_RECORD = "IS_EMPTY_DATA_BAD_RECORD";
-
-  public static final String FACT_FILE_PATH = "FACT_FILE_PATH";
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
deleted file mode 100644
index f7ce620..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/BadRecordLogHolder.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * It is holder for reason of bad records.
- */
-public class BadRecordLogHolder {
-
-  /**
-   * this map will hold the bad record unified message for columns
-   */
-  private Map<String, String> columnMessageMap = new HashMap<>();
-
-  private String reason;
-
-  private boolean badRecordAdded;
-
-  private boolean isLogged;
-
-  public String getReason() {
-    return reason;
-  }
-
-  public void setReason(String reason) {
-    this.reason = reason;
-    badRecordAdded = true;
-  }
-
-  public boolean isBadRecordNotAdded() {
-    return badRecordAdded;
-  }
-
-  public void clear() {
-    this.badRecordAdded = false;
-  }
-
-  public boolean isLogged() {
-    return isLogged;
-  }
-
-  public void setLogged(boolean logged) {
-    isLogged = logged;
-  }
-
-  public Map<String, String> getColumnMessageMap() {
-    return columnMessageMap;
-  }
-
-  /**
-   * this method will clear the map entries
-   */
-  public void finish() {
-    if (null != columnMessageMap) {
-      columnMessageMap.clear();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java
deleted file mode 100644
index 751f909..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/DictionaryCardinalityFinder.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.converter;
-
-/**
- * Finds the current cardinality of dimensions.
- */
-public interface DictionaryCardinalityFinder {
-
-  int[] getCardinality();
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
deleted file mode 100644
index 88828be..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/FieldConverter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.converter;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-
-/**
- * This interface converts/transforms the column field.
- */
-public interface FieldConverter {
-
-  /**
-   * It converts the column field and updates the data in same location/index in row.
-   * @param row
-   * @return the status whether it could be loaded or not, usually when record is added
-   * to bad records then it returns false.
-   * @throws CarbonDataLoadingException
-   */
-  void convert(CarbonRow row, BadRecordLogHolder logHolder) throws CarbonDataLoadingException;
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
deleted file mode 100644
index f4876db..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/RowConverter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-
-/**
- * convert the row
- */
-public interface RowConverter extends DictionaryCardinalityFinder {
-
-  void initialize() throws IOException;
-
-  CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException;
-
-  RowConverter createCopyForNewThread();
-
-  void finish();
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
deleted file mode 100644
index bbbf5e6..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/AbstractDictionaryFieldConverterImpl.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
-
-public abstract class AbstractDictionaryFieldConverterImpl implements FieldConverter {
-
-  public abstract void fillColumnCardinality(List<Integer> cardinality);
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
deleted file mode 100644
index 8feea6a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-
-public class ComplexFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
-
-  private GenericDataType genericDataType;
-
-  private int index;
-
-  public ComplexFieldConverterImpl(GenericDataType genericDataType, int index) {
-    this.genericDataType = genericDataType;
-    this.index = index;
-  }
-
-  @Override
-  public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
-    Object object = row.getObject(index);
-    // TODO Its temporary, needs refactor here.
-    ByteArrayOutputStream byteArray = new ByteArrayOutputStream();
-    DataOutputStream dataOutputStream = new DataOutputStream(byteArray);
-    try {
-      genericDataType.writeByteArray(object, dataOutputStream);
-      dataOutputStream.close();
-      row.update(byteArray.toByteArray(), index);
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(object + "", e);
-    }
-  }
-
-  @Override public void fillColumnCardinality(List<Integer> cardinality) {
-    genericDataType.fillCardinality(cardinality);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
deleted file mode 100644
index 8d4d5a3..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.devapi.BiDictionary;
-import org.apache.carbondata.core.devapi.DictionaryGenerationException;
-import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
-import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.newflow.dictionary.DictionaryServerClientDictionary;
-import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DictionaryFieldConverterImpl.class.getName());
-
-  private BiDictionary<Integer, Object> dictionaryGenerator;
-
-  private int index;
-
-  private CarbonDimension carbonDimension;
-
-  private String nullFormat;
-
-  private Dictionary dictionary;
-
-  private DictionaryMessage dictionaryMessage;
-
-  private boolean isEmptyBadRecord;
-
-  public DictionaryFieldConverterImpl(DataField dataField,
-      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
-      DictionaryClient client, boolean useOnePass, String storePath,
-      Map<Object, Integer> localCache, boolean isEmptyBadRecord) throws IOException {
-    this.index = index;
-    this.carbonDimension = (CarbonDimension) dataField.getColumn();
-    this.nullFormat = nullFormat;
-    this.isEmptyBadRecord = isEmptyBadRecord;
-    DictionaryColumnUniqueIdentifier identifier =
-        new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
-            dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType(),
-            CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier));
-
-    // if use one pass, use DictionaryServerClientDictionary
-    if (useOnePass) {
-      if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
-        dictionary = cache.get(identifier);
-      }
-      dictionaryMessage = new DictionaryMessage();
-      dictionaryMessage.setColumnName(dataField.getColumn().getColName());
-      // for table initialization
-      dictionaryMessage.setTableUniqueId(carbonTableIdentifier.getTableId());
-      dictionaryMessage.setData("0");
-      // for generate dictionary
-      dictionaryMessage.setType(DictionaryMessageType.DICT_GENERATION);
-      dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
-          dictionaryMessage, localCache);
-    } else {
-      dictionary = cache.get(identifier);
-      dictionaryGenerator = new PreCreatedDictionary(dictionary);
-    }
-  }
-
-  @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder)
-      throws CarbonDataLoadingException {
-    try {
-      String parsedValue = null;
-      String dimensionValue = row.getString(index);
-      if (dimensionValue == null || dimensionValue.equals(nullFormat)) {
-        parsedValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL;
-      } else {
-        parsedValue = DataTypeUtil.parseValue(dimensionValue, carbonDimension);
-      }
-      if (null == parsedValue) {
-        if ((dimensionValue.length() > 0) || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
-          String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName());
-          if (null == message) {
-            message = CarbonDataProcessorUtil.prepareFailureReason(
-                carbonDimension.getColName(), carbonDimension.getDataType());
-            logHolder.getColumnMessageMap().put(carbonDimension.getColName(), message);
-          } logHolder.setReason(message);
-        }
-        row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_SURROGATE_KEY, index);
-      } else {
-        row.update(dictionaryGenerator.getOrGenerateKey(parsedValue), index);
-      }
-    } catch (DictionaryGenerationException e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  @Override
-  public void fillColumnCardinality(List<Integer> cardinality) {
-    cardinality.add(dictionaryGenerator.size());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
deleted file mode 100644
index f269274..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DirectDictionaryFieldConverterImpl.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
-import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class DirectDictionaryFieldConverterImpl extends AbstractDictionaryFieldConverterImpl {
-
-  private DirectDictionaryGenerator directDictionaryGenerator;
-
-  private int index;
-
-  private String nullFormat;
-
-  private CarbonColumn column;
-  private boolean isEmptyBadRecord;
-
-  public DirectDictionaryFieldConverterImpl(DataField dataField, String nullFormat, int index,
-      boolean isEmptyBadRecord) {
-    this.nullFormat = nullFormat;
-    this.column = dataField.getColumn();
-    if (dataField.getDateFormat() != null && !dataField.getDateFormat().isEmpty()) {
-      this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
-          .getDirectDictionaryGenerator(dataField.getColumn().getDataType(),
-              dataField.getDateFormat());
-
-    } else {
-      this.directDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
-          .getDirectDictionaryGenerator(dataField.getColumn().getDataType());
-    }
-    this.index = index;
-    this.isEmptyBadRecord = isEmptyBadRecord;
-  }
-
-  @Override
-  public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
-    String value = row.getString(index);
-    if (value == null) {
-      logHolder.setReason(
-          CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
-      row.update(1, index);
-    } else if (value.equals(nullFormat)) {
-      row.update(1, index);
-    } else {
-      int key = directDictionaryGenerator.generateDirectSurrogateKey(value);
-      if (key == 1) {
-        if ((value.length() > 0) || (value.length() == 0 && isEmptyBadRecord)) {
-          String message = logHolder.getColumnMessageMap().get(column.getColName());
-          if (null == message) {
-            message = CarbonDataProcessorUtil.prepareFailureReason(
-                column.getColName(), column.getDataType());
-            logHolder.getColumnMessageMap().put(column.getColName(), message);
-          }
-          logHolder.setReason(message);
-        }
-      }
-      row.update(key, index);
-    }
-  }
-
-  @Override
-  public void fillColumnCardinality(List<Integer> cardinality) {
-    cardinality.add(Integer.MAX_VALUE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
deleted file mode 100644
index 1aada19..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.processing.datatypes.ArrayDataType;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
-import org.apache.carbondata.processing.datatypes.StructDataType;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
-
-public class FieldEncoderFactory {
-
-  private static FieldEncoderFactory instance;
-
-  private FieldEncoderFactory() {
-
-  }
-
-  public static FieldEncoderFactory getInstance() {
-    if (instance == null) {
-      instance = new FieldEncoderFactory();
-    }
-    return instance;
-  }
-
-  /**
-   * Creates the FieldConverter for all dimensions, for measures return null.
-   *
-   * @param dataField             column schema
-   * @param cache                 dicionary cache.
-   * @param carbonTableIdentifier table identifier
-   * @param index                 index of column in the row.
-   * @param isEmptyBadRecord
-   * @return
-   */
-  public FieldConverter createFieldEncoder(DataField dataField,
-      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
-      DictionaryClient client, Boolean useOnePass, String storePath,
-      Map<Object, Integer> localCache, boolean isEmptyBadRecord)
-      throws IOException {
-    // Converters are only needed for dimensions and measures it return null.
-    if (dataField.getColumn().isDimension()) {
-      if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
-          !dataField.getColumn().isComplex()) {
-        return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
-            isEmptyBadRecord);
-      } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
-          !dataField.getColumn().isComplex()) {
-        return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
-            index, client, useOnePass, storePath, localCache, isEmptyBadRecord);
-      } else if (dataField.getColumn().isComplex()) {
-        return new ComplexFieldConverterImpl(
-            createComplexType(dataField, cache, carbonTableIdentifier,
-                    client, useOnePass, storePath, localCache), index);
-      } else {
-        return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
-      }
-    } else {
-      return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
-    }
-  }
-
-  /**
-   * Create parser for the carbon column.
-   */
-  private static GenericDataType createComplexType(DataField dataField,
-      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
-      String storePath, Map<Object, Integer> localCache) {
-    return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
-        carbonTableIdentifier, client, useOnePass, storePath, localCache);
-  }
-
-  /**
-   * This method may be called recursively if the carbon column is complex type.
-   *
-   * @return GenericDataType
-   */
-  private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
-      Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, DictionaryClient client, Boolean useOnePass,
-      String storePath, Map<Object, Integer> localCache) {
-    switch (carbonColumn.getDataType()) {
-      case ARRAY:
-        List<CarbonDimension> listOfChildDimensions =
-            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
-        // Create array parser with complex delimiter
-        ArrayDataType arrayDataType =
-            new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
-        for (CarbonDimension dimension : listOfChildDimensions) {
-          arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
-              carbonTableIdentifier, client, useOnePass, storePath, localCache));
-        }
-        return arrayDataType;
-      case STRUCT:
-        List<CarbonDimension> dimensions =
-            ((CarbonDimension) carbonColumn).getListOfChildDimensions();
-        // Create struct parser with complex delimiter
-        StructDataType structDataType =
-            new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
-        for (CarbonDimension dimension : dimensions) {
-          structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(), cache,
-              carbonTableIdentifier, client, useOnePass, storePath, localCache));
-        }
-        return structDataType;
-      case MAP:
-        throw new UnsupportedOperationException("Complex type Map is not supported yet");
-      default:
-        return new PrimitiveDataType(carbonColumn.getColName(), parentName,
-            carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
-            carbonTableIdentifier, client, useOnePass, storePath, localCache);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
deleted file mode 100644
index 8e20b8f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/MeasureFieldConverterImpl.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * Converter for measure
- */
-public class MeasureFieldConverterImpl implements FieldConverter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(MeasureFieldConverterImpl.class.getName());
-
-  private int index;
-
-  private DataType dataType;
-
-  private CarbonMeasure measure;
-
-  private String nullformat;
-
-  private boolean isEmptyBadRecord;
-
-  public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
-      boolean isEmptyBadRecord) {
-    this.dataType = dataField.getColumn().getDataType();
-    this.measure = (CarbonMeasure) dataField.getColumn();
-    this.nullformat = nullformat;
-    this.index = index;
-    this.isEmptyBadRecord = isEmptyBadRecord;
-  }
-
-  @Override
-  public void convert(CarbonRow row, BadRecordLogHolder logHolder)
-      throws CarbonDataLoadingException {
-    String value = row.getString(index);
-    Object output;
-    boolean isNull = CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(value);
-    if (value == null || isNull) {
-      String message = logHolder.getColumnMessageMap().get(measure.getColName());
-      if (null == message) {
-        message = CarbonDataProcessorUtil
-            .prepareFailureReason(measure.getColName(), measure.getDataType());
-        logHolder.getColumnMessageMap().put(measure.getColName(), message);
-      }
-      row.update(null, index);
-    } else if (value.length() == 0) {
-      if (isEmptyBadRecord) {
-        String message = logHolder.getColumnMessageMap().get(measure.getColName());
-        if (null == message) {
-          message = CarbonDataProcessorUtil
-              .prepareFailureReason(measure.getColName(), measure.getDataType());
-          logHolder.getColumnMessageMap().put(measure.getColName(), message);
-        }
-        logHolder.setReason(message);
-      }
-      row.update(null, index);
-    } else if (value.equals(nullformat)) {
-      row.update(null, index);
-    } else {
-      try {
-        output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
-        row.update(output, index);
-      } catch (NumberFormatException e) {
-        LOGGER.warn(
-            "Cant not convert value to Numeric type value. Value considered as null.");
-        logHolder.setReason(
-            CarbonDataProcessorUtil.prepareFailureReason(measure.getColName(), dataType));
-        output = null;
-        row.update(output, index);
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
deleted file mode 100644
index 4861d78..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/NonDictionaryFieldConverterImpl.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-public class NonDictionaryFieldConverterImpl implements FieldConverter {
-
-  private DataType dataType;
-
-  private int index;
-
-  private String nullformat;
-
-  private CarbonColumn column;
-
-  private boolean isEmptyBadRecord;
-
-  private DataField dataField;
-
-  public NonDictionaryFieldConverterImpl(DataField dataField, String nullformat, int index,
-      boolean isEmptyBadRecord) {
-    this.dataField = dataField;
-    this.dataType = dataField.getColumn().getDataType();
-    this.column = dataField.getColumn();
-    this.index = index;
-    this.nullformat = nullformat;
-    this.isEmptyBadRecord = isEmptyBadRecord;
-  }
-
-  @Override public void convert(CarbonRow row, BadRecordLogHolder logHolder) {
-    String dimensionValue = row.getString(index);
-    if (null == dimensionValue && column.getDataType() != DataType.STRING) {
-      logHolder.setReason(
-          CarbonDataProcessorUtil.prepareFailureReason(column.getColName(), column.getDataType()));
-      updateWithNullValue(row);
-    } else if (dimensionValue == null || dimensionValue.equals(nullformat)) {
-      updateWithNullValue(row);
-    } else {
-      try {
-        row.update(DataTypeUtil
-            .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType,
-                dataField.getDateFormat()), index);
-      } catch (Throwable ex) {
-        if (dimensionValue.length() > 0 || (dimensionValue.length() == 0 && isEmptyBadRecord)) {
-          String message = logHolder.getColumnMessageMap().get(column.getColName());
-          if (null == message) {
-            message = CarbonDataProcessorUtil
-                .prepareFailureReason(column.getColName(), column.getDataType());
-            logHolder.getColumnMessageMap().put(column.getColName(), message);
-          }
-          logHolder.setReason(message);
-          updateWithNullValue(row);
-        } else {
-          updateWithNullValue(row);
-        }
-      }
-    }
-  }
-
-  private void updateWithNullValue(CarbonRow row) {
-    if (dataType == DataType.STRING) {
-      row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
-    } else {
-      row.update(CarbonCommonConstants.EMPTY_BYTE_ARRAY, index);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
deleted file mode 100644
index eecb0e9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.converter.impl;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.dictionary.client.DictionaryClient;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
-import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
-import org.apache.carbondata.processing.newflow.converter.FieldConverter;
-import org.apache.carbondata.processing.newflow.converter.RowConverter;
-import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
-
-/**
- * It converts the complete row if necessary, dictionary columns are encoded with dictionary values
- * and nondictionary values are converted to binary.
- */
-public class RowConverterImpl implements RowConverter {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(RowConverterImpl.class.getName());
-
-  private CarbonDataLoadConfiguration configuration;
-
-  private DataField[] fields;
-
-  private FieldConverter[] fieldConverters;
-
-  private BadRecordsLogger badRecordLogger;
-
-  private BadRecordLogHolder logHolder;
-
-  private List<DictionaryClient> dictClients = new ArrayList<>();
-
-  private ExecutorService executorService;
-
-  private Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache;
-
-  private Map<Object, Integer>[] localCaches;
-
-  public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration,
-      BadRecordsLogger badRecordLogger) {
-    this.fields = fields;
-    this.configuration = configuration;
-    this.badRecordLogger = badRecordLogger;
-  }
-
-  @Override
-  public void initialize() throws IOException {
-    CacheProvider cacheProvider = CacheProvider.getInstance();
-    cache = cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
-        configuration.getTableIdentifier().getStorePath());
-    String nullFormat =
-        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
-            .toString();
-    boolean isEmptyBadRecord = Boolean.parseBoolean(
-        configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
-            .toString());
-    List<FieldConverter> fieldConverterList = new ArrayList<>();
-    localCaches = new Map[fields.length];
-    long lruCacheStartTime = System.currentTimeMillis();
-    DictionaryClient client = createDictionaryClient();
-    dictClients.add(client);
-
-    for (int i = 0; i < fields.length; i++) {
-      localCaches[i] = new ConcurrentHashMap<>();
-      FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
-          .createFieldEncoder(fields[i], cache,
-              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
-              configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
-              localCaches[i], isEmptyBadRecord);
-      fieldConverterList.add(fieldConverter);
-    }
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
-    fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
-    logHolder = new BadRecordLogHolder();
-  }
-
-  private DictionaryClient createDictionaryClient() {
-    // for one pass load, start the dictionary client
-    if (configuration.getUseOnePass()) {
-      if (executorService == null) {
-        executorService = Executors.newCachedThreadPool();
-      }
-      Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>() {
-        @Override
-        public DictionaryClient call() throws Exception {
-          Thread.currentThread().setName("Dictionary client");
-          DictionaryClient dictionaryClient = new DictionaryClient();
-          dictionaryClient.startClient(configuration.getDictionaryServerHost(),
-              configuration.getDictionaryServerPort());
-          return dictionaryClient;
-        }
-      });
-
-      try {
-        // wait for client initialization finished, or will raise null pointer exception
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-        LOGGER.error(e);
-        throw new RuntimeException(e);
-      }
-
-      try {
-        return result.get();
-      } catch (InterruptedException | ExecutionException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return null;
-  }
-
-  @Override
-  public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
-    //TODO: only copy if it is bad record
-    CarbonRow copy = row.getCopy();
-    logHolder.setLogged(false);
-    logHolder.clear();
-    for (int i = 0; i < fieldConverters.length; i++) {
-      fieldConverters[i].convert(row, logHolder);
-      if (!logHolder.isLogged() && logHolder.isBadRecordNotAdded()) {
-        badRecordLogger.addBadRecordsToBuilder(copy.getData(), logHolder.getReason());
-        if (badRecordLogger.isDataLoadFail()) {
-          String error = "Data load failed due to bad record: " + logHolder.getReason() +
-              "Please enable bad record logger to know the detail reason.";
-          throw new BadRecordFoundException(error);
-        }
-        logHolder.clear();
-        logHolder.setLogged(true);
-        if (badRecordLogger.isBadRecordConvertNullDisable()) {
-          return null;
-        }
-      }
-    }
-    return row;
-  }
-
-  @Override
-  public void finish() {
-    // close dictionary client when finish write
-    if (configuration.getUseOnePass()) {
-      for (DictionaryClient client : dictClients) {
-        if (client != null) {
-          client.shutDown();
-        }
-      }
-      if (null != logHolder) {
-        logHolder.finish();
-      }
-      if (executorService != null) {
-        executorService.shutdownNow();
-        executorService = null;
-      }
-    }
-  }
-
-  @Override
-  public RowConverter createCopyForNewThread() {
-    RowConverterImpl converter =
-        new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger);
-    List<FieldConverter> fieldConverterList = new ArrayList<>();
-    DictionaryClient client = createDictionaryClient();
-    dictClients.add(client);
-    String nullFormat =
-        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
-            .toString();
-    boolean isEmptyBadRecord = Boolean.parseBoolean(
-        configuration.getDataLoadProperty(DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD)
-            .toString());
-    for (int i = 0; i < fields.length; i++) {
-      FieldConverter fieldConverter = null;
-      try {
-        fieldConverter = FieldEncoderFactory.getInstance().createFieldEncoder(fields[i], cache,
-            configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat, client,
-            configuration.getUseOnePass(), configuration.getTableIdentifier().getStorePath(),
-            localCaches[i], isEmptyBadRecord);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      fieldConverterList.add(fieldConverter);
-    }
-    converter.fieldConverters =
-        fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
-    converter.logHolder = new BadRecordLogHolder();
-    return converter;
-  }
-
-  @Override public int[] getCardinality() {
-    List<Integer> dimCardinality = new ArrayList<>();
-    if (fieldConverters != null) {
-      for (int i = 0; i < fieldConverters.length; i++) {
-        if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
-          ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
-              .fillColumnCardinality(dimCardinality);
-        }
-      }
-    }
-    int[] cardinality = new int[dimCardinality.size()];
-    for (int i = 0; i < dimCardinality.size(); i++) {
-      cardinality[i] = dimCardinality.get(i);
-    }
-    return cardinality;
-  }
-}