You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:20 UTC

[33/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
new file mode 100644
index 0000000..1e73867
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * Replace row data fields with dictionary values if column is configured dictionary encoded.
+ * And nondictionary columns as well as complex columns will be converted to byte[].
+ */
+public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private List<RowConverter> converters;
+  private BadRecordsLogger badRecordLogger;
+
+  public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override
+  public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    super.initialize();
+    child.initialize();
+    converters = new ArrayList<>();
+    badRecordLogger = createBadRecordLogger(configuration);
+    RowConverter converter =
+        new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    configuration.setCardinalityFinder(converter);
+    converters.add(converter);
+    converter.initialize();
+  }
+
+  /**
+   * Create the iterator using child iterator.
+   *
+   * @param childIter
+   * @return new iterator with step specific processing.
+   */
+  @Override
+  protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
+    return new CarbonIterator<CarbonRowBatch>() {
+      private boolean first = true;
+      private RowConverter localConverter;
+      @Override public boolean hasNext() {
+        if (first) {
+          first = false;
+          localConverter = converters.get(0).createCopyForNewThread();
+          synchronized (converters) {
+            converters.add(localConverter);
+          }
+        }
+        return childIter.hasNext();
+      }
+      @Override public CarbonRowBatch next() {
+        return processRowBatch(childIter.next(), localConverter);
+      }
+    };
+  }
+
+  /**
+   * Process the batch of rows as per the step logic.
+   *
+   * @param rowBatch
+   * @return processed row.
+   */
+  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
+    CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+    while (rowBatch.hasNext()) {
+      newBatch.addRow(localConverter.convert(rowBatch.next()));
+    }
+    rowCounter.getAndAdd(newBatch.getSize());
+    return newBatch;
+  }
+
+  @Override
+  protected CarbonRow processRow(CarbonRow row) {
+    throw new UnsupportedOperationException();
+  }
+
+  public static BadRecordsLogger createBadRecordLogger(CarbonDataLoadConfiguration configuration) {
+    boolean badRecordsLogRedirect = false;
+    boolean badRecordConvertNullDisable = false;
+    boolean isDataLoadFail = false;
+    boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+            .toString());
+    Object bad_records_action =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+            .toString();
+    if (null != bad_records_action) {
+      LoggerAction loggerAction = null;
+      try {
+        loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+      } catch (IllegalArgumentException e) {
+        loggerAction = LoggerAction.FORCE;
+      }
+      switch (loggerAction) {
+        case FORCE:
+          badRecordConvertNullDisable = false;
+          break;
+        case REDIRECT:
+          badRecordsLogRedirect = true;
+          badRecordConvertNullDisable = true;
+          break;
+        case IGNORE:
+          badRecordsLogRedirect = false;
+          badRecordConvertNullDisable = true;
+          break;
+        case FAIL:
+          isDataLoadFail = true;
+          break;
+      }
+    }
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
+        identifier.getTableName() + '_' + System.currentTimeMillis(),
+        getBadLogStoreLocation(configuration,
+            identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+                .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration
+                .getSegmentId() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
+        badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
+  }
+
+  public static String getBadLogStoreLocation(CarbonDataLoadConfiguration configuration,
+      String storeLocation) {
+    String badLogStoreLocation = (String) configuration
+        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
+    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+    return badLogStoreLocation;
+  }
+
+  @Override
+  public void close() {
+    if (!closed) {
+      if (null != badRecordLogger) {
+        badRecordLogger.closeStreams();
+        renameBadRecord(badRecordLogger, configuration);
+      }
+      super.close();
+      if (converters != null) {
+        for (RowConverter converter : converters) {
+          if (null != converter) {
+            converter.finish();
+          }
+        }
+      }
+    }
+  }
+
+  public static void close(BadRecordsLogger badRecordLogger, CarbonDataLoadConfiguration
+      configuration, RowConverter converter) {
+    if (badRecordLogger != null) {
+      badRecordLogger.closeStreams();
+      renameBadRecord(badRecordLogger, configuration);
+    }
+    if (converter != null) {
+      converter.finish();
+    }
+  }
+
+  private static void renameBadRecord(BadRecordsLogger badRecordLogger,
+      CarbonDataLoadConfiguration configuration) {
+    // rename operation should be performed only in case either bad reccords loggers is enabled
+    // or bad records redirect is enabled
+    if (badRecordLogger.isBadRecordLoggerEnable() || badRecordLogger.isBadRecordsLogRedirect()) {
+      // rename the bad record in progress to normal
+      CarbonTableIdentifier identifier =
+          configuration.getTableIdentifier().getCarbonTableIdentifier();
+      CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
+          identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+              .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
+              + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo());
+    }
+  }
+
+  @Override protected String getStepName() {
+    return "Data Converter";
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
new file mode 100644
index 0000000..009c6a0
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.BadRecordsLogger;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.converter.RowConverter;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.partition.Partitioner;
+import org.apache.carbondata.processing.loading.partition.impl.HashPartitionerImpl;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * Replace row data fields with dictionary values if column is configured dictionary encoded.
+ * And nondictionary columns as well as complex columns will be converted to byte[].
+ */
+public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoadProcessorStep {
+
+  private List<RowConverter> converters;
+
+  private Partitioner<Object[]> partitioner;
+
+  private BadRecordsLogger badRecordLogger;
+
+  public DataConverterProcessorWithBucketingStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override
+  public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    super.initialize();
+    child.initialize();
+    converters = new ArrayList<>();
+    badRecordLogger = createBadRecordLogger();
+    RowConverter converter =
+        new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    configuration.setCardinalityFinder(converter);
+    converters.add(converter);
+    converter.initialize();
+    List<Integer> indexes = new ArrayList<>();
+    List<ColumnSchema> columnSchemas = new ArrayList<>();
+    DataField[] inputDataFields = getOutput();
+    BucketingInfo bucketingInfo = configuration.getBucketingInfo();
+    for (int i = 0; i < inputDataFields.length; i++) {
+      for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) {
+        if (inputDataFields[i].getColumn().getColName()
+            .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) {
+          indexes.add(i);
+          columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema());
+          break;
+        }
+      }
+    }
+    partitioner =
+        new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumberOfBuckets());
+  }
+
+  /**
+   * Create the iterator using child iterator.
+   *
+   * @param childIter
+   * @return new iterator with step specific processing.
+   */
+  @Override
+  protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
+    return new CarbonIterator<CarbonRowBatch>() {
+      RowConverter localConverter;
+      private boolean first = true;
+      @Override public boolean hasNext() {
+        if (first) {
+          first = false;
+          localConverter = converters.get(0).createCopyForNewThread();
+          converters.add(localConverter);
+        }
+        return childIter.hasNext();
+      }
+
+      @Override public CarbonRowBatch next() {
+        return processRowBatch(childIter.next(), localConverter);
+      }
+    };
+  }
+
+  /**
+   * Process the batch of rows as per the step logic.
+   *
+   * @param rowBatch
+   * @return processed row.
+   */
+  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
+    CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+    while (rowBatch.hasNext()) {
+      CarbonRow next = rowBatch.next();
+      short bucketNumber = (short) partitioner.getPartition(next.getData());
+      CarbonRow convertRow = localConverter.convert(next);
+      convertRow.bucketNumber = bucketNumber;
+      newBatch.addRow(convertRow);
+    }
+    rowCounter.getAndAdd(newBatch.getSize());
+    return newBatch;
+  }
+
+  @Override
+  protected CarbonRow processRow(CarbonRow row) {
+    throw new UnsupportedOperationException();
+  }
+
+  private BadRecordsLogger createBadRecordLogger() {
+    boolean badRecordsLogRedirect = false;
+    boolean badRecordConvertNullDisable = false;
+    boolean isDataLoadFail = false;
+    boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+            .toString());
+    Object bad_records_action =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+            .toString();
+    if (null != bad_records_action) {
+      LoggerAction loggerAction = null;
+      try {
+        loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+      } catch (IllegalArgumentException e) {
+        loggerAction = LoggerAction.FORCE;
+      }
+      switch (loggerAction) {
+        case FORCE:
+          badRecordConvertNullDisable = false;
+          break;
+        case REDIRECT:
+          badRecordsLogRedirect = true;
+          badRecordConvertNullDisable = true;
+          break;
+        case IGNORE:
+          badRecordsLogRedirect = false;
+          badRecordConvertNullDisable = true;
+          break;
+        case FAIL:
+          isDataLoadFail = true;
+          break;
+      }
+    }
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    return new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
+        identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
+        identifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + identifier
+            .getTableName() + CarbonCommonConstants.FILE_SEPARATOR + configuration.getSegmentId()
+            + CarbonCommonConstants.FILE_SEPARATOR + configuration.getTaskNo()),
+        badRecordsLogRedirect, badRecordsLoggerEnable, badRecordConvertNullDisable, isDataLoadFail);
+  }
+
+  private String getBadLogStoreLocation(String storeLocation) {
+    String badLogStoreLocation = (String) configuration
+        .getDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH);
+    if (null == badLogStoreLocation) {
+      badLogStoreLocation =
+          CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    }
+    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+    return badLogStoreLocation;
+  }
+
+  @Override
+  public void close() {
+    if (!closed) {
+      super.close();
+      if (null != badRecordLogger) {
+        badRecordLogger.closeStreams();
+        renameBadRecord(configuration);
+      }
+      if (converters != null) {
+        for (RowConverter converter : converters) {
+          converter.finish();
+        }
+      }
+    }
+  }
+  private static void renameBadRecord(CarbonDataLoadConfiguration configuration) {
+    // rename the bad record in progress to normal
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal(configuration,
+        identifier.getDatabaseName() + File.separator + identifier.getTableName()
+            + File.separator + configuration.getSegmentId() + File.separator + configuration
+            .getTaskNo());
+  }
+  @Override protected String getStepName() {
+    return "Data Converter with Bucketing";
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
new file mode 100644
index 0000000..f030d52
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from batch of sorted files(it could be in-memory/disk based files)
+ * which are generated in previous sort step. And it writes data to carbondata file.
+ * It also generates mdk key while writing to carbondata file
+ */
+public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
+
+  public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override public void initialize() throws IOException {
+    super.initialize();
+    child.initialize();
+  }
+
+  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
+    String[] storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
+            configuration.getSegmentId() + "", false, false);
+    CarbonDataProcessorUtil.createLocations(storeLocation);
+    return storeLocation;
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+    Iterator<CarbonRowBatch>[] iterators = child.execute();
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    String tableName = tableIdentifier.getTableName();
+    try {
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+              System.currentTimeMillis());
+      int i = 0;
+      for (Iterator<CarbonRowBatch> iterator : iterators) {
+        String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
+        int k = 0;
+        while (iterator.hasNext()) {
+          CarbonRowBatch next = iterator.next();
+          // If no rows from merge sorter, then don't create a file in fact column handler
+          if (next.hasNext()) {
+            CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+                .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
+            CarbonFactHandler dataHandler = CarbonFactHandlerFactory
+                .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+            dataHandler.initialise();
+            processBatch(next, dataHandler);
+            finish(tableName, dataHandler);
+          }
+        }
+        i++;
+      }
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterBatchProcessorStepImpl");
+      if (e.getCause() instanceof BadRecordFoundException) {
+        throw new BadRecordFoundException(e.getCause().getMessage());
+      }
+      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
+    }
+    return null;
+  }
+
+  @Override protected String getStepName() {
+    return "Data Batch Writer";
+  }
+
+  private void finish(String tableName, CarbonFactHandler dataHandler) {
+    try {
+      dataHandler.finish();
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
+    }
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
+    processingComplete(dataHandler);
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+            System.currentTimeMillis());
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+  }
+
+  private void processingComplete(CarbonFactHandler dataHandler) {
+    if (null != dataHandler) {
+      try {
+        dataHandler.closeHandler();
+      } catch (Exception e) {
+        LOGGER.error(e);
+        throw new CarbonDataLoadingException(
+            "There is an unexpected error while closing data handler", e);
+      }
+    }
+  }
+
+  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) throws Exception {
+    int batchSize = 0;
+    while (batch.hasNext()) {
+      CarbonRow row = batch.next();
+      dataHandler.addDataToStore(row);
+      batchSize++;
+    }
+    batch.close();
+    rowCounter.getAndAdd(batchSize);
+  }
+
+  @Override protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
new file mode 100644
index 0000000..e8e2b0e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from sorted files which are generated in previous sort step.
+ * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
+ */
+public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataWriterProcessorStepImpl.class.getName());
+
+  private long readCounter;
+
+  public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration) {
+    super(configuration, null);
+  }
+
+  @Override public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override public void initialize() throws IOException {
+    super.initialize();
+    child.initialize();
+  }
+
+  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
+    String[] storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
+            configuration.getSegmentId() + "", false, false);
+    CarbonDataProcessorUtil.createLocations(storeLocation);
+    return storeLocation;
+  }
+
+  public CarbonFactDataHandlerModel getDataHandlerModel(int partitionId) {
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId));
+    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+        .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, 0);
+    return model;
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+    Iterator<CarbonRowBatch>[] iterators = child.execute();
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    String tableName = tableIdentifier.getTableName();
+    try {
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+              System.currentTimeMillis());
+      int i = 0;
+      for (Iterator<CarbonRowBatch> iterator : iterators) {
+        String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
+
+        CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
+            .createCarbonFactDataHandlerModel(configuration, storeLocation, i, 0);
+        CarbonFactHandler dataHandler = null;
+        boolean rowsNotExist = true;
+        while (iterator.hasNext()) {
+          if (rowsNotExist) {
+            rowsNotExist = false;
+            dataHandler = CarbonFactHandlerFactory
+                .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+            dataHandler.initialise();
+          }
+          processBatch(iterator.next(), dataHandler);
+        }
+        if (!rowsNotExist) {
+          finish(dataHandler);
+        }
+        i++;
+      }
+
+    } catch (CarbonDataWriterException e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
+      throw new CarbonDataLoadingException(
+          "Error while initializing data handler : " + e.getMessage());
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
+      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
+    }
+    return null;
+  }
+
+  @Override protected String getStepName() {
+    return "Data Writer";
+  }
+
+  public void finish(CarbonFactHandler dataHandler) {
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    String tableName = tableIdentifier.getTableName();
+
+    try {
+      dataHandler.finish();
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
+    }
+    LOGGER.info("Record Processed For table: " + tableName);
+    String logMessage =
+        "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: "
+            + rowCounter.get();
+    LOGGER.info(logMessage);
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
+    processingComplete(dataHandler);
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+            System.currentTimeMillis());
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+  }
+
+  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
+    if (null != dataHandler) {
+      try {
+        dataHandler.closeHandler();
+      } catch (CarbonDataWriterException e) {
+        LOGGER.error(e, e.getMessage());
+        throw new CarbonDataLoadingException(e.getMessage(), e);
+      } catch (Exception e) {
+        LOGGER.error(e, e.getMessage());
+        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
+      }
+    }
+  }
+
+  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler)
+      throws CarbonDataLoadingException {
+    try {
+      while (batch.hasNext()) {
+        CarbonRow row = batch.next();
+        dataHandler.addDataToStore(row);
+        readCounter++;
+      }
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    rowCounter.getAndAdd(batch.getSize());
+  }
+
+  public void processRow(CarbonRow row, CarbonFactHandler dataHandler) throws KeyGenException {
+    try {
+      readCounter++;
+      dataHandler.addDataToStore(row);
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException("unable to generate the mdkey", e);
+    }
+    rowCounter.getAndAdd(1);
+  }
+
+  @Override protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
new file mode 100644
index 0000000..70a1254
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepImpl.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.parser.RowParser;
+import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private RowParser rowParser;
+
+  private CarbonIterator<Object[]>[] inputIterators;
+
+  /**
+   * executor service to execute the query
+   */
+  public ExecutorService executorService;
+
+  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+      CarbonIterator<Object[]>[] inputIterators) {
+    super(configuration, null);
+    this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+    return configuration.getDataFields();
+  }
+
+  @Override public void initialize() throws IOException {
+    super.initialize();
+    rowParser = new RowParserImpl(getOutput(), configuration);
+    executorService = Executors.newCachedThreadPool();
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] execute() {
+    int batchSize = CarbonProperties.getInstance().getBatchSize();
+    List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
+    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
+    for (int i = 0; i < outIterators.length; i++) {
+      outIterators[i] =
+          new InputProcessorIterator(readerIterators[i], rowParser, batchSize,
+              configuration.isPreFetch(), executorService, rowCounter);
+    }
+    return outIterators;
+  }
+
+  /**
+   * Partition input iterators equally as per the number of threads.
+   * @return
+   */
+  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
+    // Get the number of cores configured in property.
+    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    // Get the minimum of number of cores and iterators size to get the number of parallel threads
+    // to be launched.
+    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
+
+    List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
+    for (int i = 0; i < parallelThreadNumber; i++) {
+      iterators[i] = new ArrayList<>();
+    }
+    // Equally partition the iterators as per number of threads
+    for (int i = 0; i < inputIterators.length; i++) {
+      iterators[i % parallelThreadNumber].add(inputIterators[i]);
+    }
+    return iterators;
+  }
+
+  @Override protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+  @Override public void close() {
+    if (!closed) {
+      super.close();
+      executorService.shutdown();
+      for (CarbonIterator inputIterator : inputIterators) {
+        inputIterator.close();
+      }
+    }
+  }
+
+  @Override protected String getStepName() {
+    return "Input Processor";
+  }
+
+  /**
+   * This iterator wraps the list of iterators and it starts iterating the each
+   * iterator of the list one by one. It also parse the data while iterating it.
+   */
+  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private List<CarbonIterator<Object[]>> inputIterators;
+
+    private CarbonIterator<Object[]> currentIterator;
+
+    private int counter;
+
+    private int batchSize;
+
+    private RowParser rowParser;
+
+    private Future<CarbonRowBatch> future;
+
+    private ExecutorService executorService;
+
+    private boolean nextBatch;
+
+    private boolean firstTime;
+
+    private boolean preFetch;
+
+    private AtomicLong rowCounter;
+
+    public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators,
+        RowParser rowParser, int batchSize, boolean preFetch, ExecutorService executorService,
+        AtomicLong rowCounter) {
+      this.inputIterators = inputIterators;
+      this.batchSize = batchSize;
+      this.rowParser = rowParser;
+      this.counter = 0;
+      // Get the first iterator from the list.
+      currentIterator = inputIterators.get(counter++);
+      this.executorService = executorService;
+      this.rowCounter = rowCounter;
+      this.preFetch = preFetch;
+      this.nextBatch = false;
+      this.firstTime = true;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nextBatch || internalHasNext();
+    }
+
+    private boolean internalHasNext() {
+      if (firstTime) {
+        firstTime = false;
+        currentIterator.initialize();
+      }
+      boolean hasNext = currentIterator.hasNext();
+      // If iterator is finished then check for next iterator.
+      if (!hasNext) {
+        currentIterator.close();
+        // Check next iterator is available in the list.
+        if (counter < inputIterators.size()) {
+          // Get the next iterator from the list.
+          currentIterator = inputIterators.get(counter++);
+          // Initialize the new iterator
+          currentIterator.initialize();
+          hasNext = internalHasNext();
+        }
+      }
+      return hasNext;
+    }
+
+    @Override
+    public CarbonRowBatch next() {
+      if (preFetch) {
+        return getCarbonRowBatchWithPreFetch();
+      } else {
+        return getBatch();
+      }
+    }
+
+    private CarbonRowBatch getCarbonRowBatchWithPreFetch() {
+      CarbonRowBatch result = null;
+      if (future == null) {
+        future = getCarbonRowBatch();
+      }
+      try {
+        result = future.get();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+      nextBatch = false;
+      if (hasNext()) {
+        nextBatch = true;
+        future = getCarbonRowBatch();
+      }
+
+      return result;
+    }
+
+    private Future<CarbonRowBatch> getCarbonRowBatch() {
+      return executorService.submit(new Callable<CarbonRowBatch>() {
+        @Override public CarbonRowBatch call() throws Exception {
+          return getBatch();
+
+        }
+      });
+    }
+
+    private CarbonRowBatch getBatch() {
+      // Create batch and fill it.
+      CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
+      int count = 0;
+      while (internalHasNext() && count < batchSize) {
+        carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
+        count++;
+      }
+      rowCounter.getAndAdd(carbonRowBatch.getSize());
+      return carbonRowBatch;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java
new file mode 100644
index 0000000..856d68c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/SortProcessorStepImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.loading.sort.Sorter;
+import org.apache.carbondata.processing.loading.sort.SorterFactory;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+/**
+ * It sorts the data and write them to intermediate temp files. These files will be further read
+ * by next step for writing to carbondata files.
+ */
+public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private Sorter sorter;
+
+  public SortProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override
+  public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override
+  public void initialize() throws IOException {
+    super.initialize();
+    child.initialize();
+    SortParameters sortParameters = SortParameters.createSortParameters(configuration);
+    sorter = SorterFactory.createSorter(configuration, rowCounter);
+    sorter.initialize(sortParameters);
+  }
+
+  @Override
+  public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+    final Iterator<CarbonRowBatch>[] iterators = child.execute();
+    return sorter.sort(iterators);
+  }
+
+  @Override
+  protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+  @Override
+  public void close() {
+    if (!closed) {
+      super.close();
+      if (sorter != null) {
+        sorter.close();
+      }
+    }
+  }
+
+  @Override protected String getStepName() {
+    return "Sort Processor";
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index f76c66f..aa77fb6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -23,7 +23,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 838e5be..8631e1d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -52,7 +52,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 
 /**
  * utility class for load merging.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 095e5a3..edffae9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -33,15 +33,15 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sort.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
 import org.apache.carbondata.processing.store.CarbonFactHandler;
 import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
-import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.spark.sql.types.Decimal;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index ef53163..f82f365 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -34,8 +34,8 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.processing.merger.exeception.SliceMergerException;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.exception.SliceMergerException;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
 import org.apache.carbondata.processing.store.CarbonFactHandler;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java b/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java
deleted file mode 100644
index 3ae3604..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/exeception/SliceMergerException.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.merger.exeception;
-
-import java.util.Locale;
-
-public class SliceMergerException extends Exception {
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public SliceMergerException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public SliceMergerException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java
deleted file mode 100644
index cbdd7b4..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonDataLoadSchema.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.model;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-
-/**
- * Wrapper Data Load Schema object which will be used to
- * support relation while data loading
- */
-public class CarbonDataLoadSchema implements Serializable {
-
-  /**
-   * default serializer
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * CarbonTable info
-   */
-  private CarbonTable carbonTable;
-
-  /**
-   * CarbonDataLoadSchema constructor which takes CarbonTable
-   *
-   * @param carbonTable
-   */
-  public CarbonDataLoadSchema(CarbonTable carbonTable) {
-    this.carbonTable = carbonTable;
-  }
-
-  /**
-   * get carbontable
-   *
-   * @return carbonTable
-   */
-  public CarbonTable getCarbonTable() {
-    return carbonTable;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
deleted file mode 100644
index be2c8a5..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ /dev/null
@@ -1,764 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.model;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-
-public class CarbonLoadModel implements Serializable {
-
-  private static final long serialVersionUID = 6580168429197697465L;
-
-  private String databaseName;
-
-  private String tableName;
-
-  private String factFilePath;
-
-  private String colDictFilePath;
-
-  private String partitionId;
-
-  private CarbonDataLoadSchema carbonDataLoadSchema;
-
-  private boolean aggLoadRequest;
-
-  private String storePath;
-
-  private boolean isRetentionRequest;
-
-  private String csvHeader;
-  private String[] csvHeaderColumns;
-  private String csvDelimiter;
-  private String complexDelimiterLevel1;
-  private String complexDelimiterLevel2;
-
-  private boolean isDirectLoad;
-  private List<LoadMetadataDetails> loadMetadataDetails;
-  private transient SegmentUpdateStatusManager segmentUpdateStatusManager;
-
-  private String blocksID;
-
-  /**
-   * Map from carbon dimension to pre defined dict file path
-   */
-  private HashMap<CarbonDimension, String> predefDictMap;
-
-  /**
-   * task id, each spark task has a unique id
-   */
-  private String taskNo;
-  /**
-   * new load start time
-   */
-  private long factTimeStamp;
-  /**
-   * load Id
-   */
-  private String segmentId;
-
-  private String allDictPath;
-
-  /**
-   * escape Char
-   */
-  private String escapeChar;
-
-  /**
-   * quote Char
-   */
-  private String quoteChar;
-
-  /**
-   * comment Char
-   */
-  private String commentChar;
-
-  private String dateFormat;
-
-  private String defaultTimestampFormat;
-
-  private String defaultDateFormat;
-
-  /**
-   * defines the string that should be treated as null while loadind data
-   */
-  private String serializationNullFormat;
-
-  /**
-   * defines the string to specify whether the bad record logger should be enabled or not
-   */
-  private String badRecordsLoggerEnable;
-
-  /**
-   * defines the option to specify the bad record logger action
-   */
-  private String badRecordsAction;
-
-  /**
-   * Max number of columns that needs to be parsed by univocity parser
-   */
-  private String maxColumns;
-
-  /**
-   * defines the string to specify whether empty data is good or bad
-   */
-  private String isEmptyDataBadRecord;
-
-  /**
-   * Use one pass to generate dictionary
-   */
-  private boolean useOnePass;
-
-  /**
-   * dictionary server host
-   */
-  private String dictionaryServerHost;
-
-  /**
-   * dictionary sever port
-   */
-  private int dictionaryServerPort;
-
-  /**
-   * Pre fetch data from csv reader
-   */
-  private boolean preFetch;
-
-  /**
-   * Batch sort should be enabled or not
-   */
-  private String sortScope;
-
-  /**
-   * Batch sort size in mb.
-   */
-  private String batchSortSizeInMb;
-  /**
-   * bad record location
-   */
-  private String badRecordsLocation;
-
-  /**
-   * Number of partitions in global sort.
-   */
-  private String globalSortPartitions;
-
-  /**
-   * get escape char
-   *
-   * @return
-   */
-  public String getEscapeChar() {
-    return escapeChar;
-  }
-
-  /**
-   * set escape char
-   *
-   * @param escapeChar
-   */
-  public void setEscapeChar(String escapeChar) {
-    this.escapeChar = escapeChar;
-  }
-
-  public String getCsvDelimiter() {
-    return csvDelimiter;
-  }
-
-  public void setCsvDelimiter(String csvDelimiter) {
-    this.csvDelimiter = csvDelimiter;
-  }
-
-  public String getComplexDelimiterLevel1() {
-    return complexDelimiterLevel1;
-  }
-
-  public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
-    this.complexDelimiterLevel1 = complexDelimiterLevel1;
-  }
-
-  public String getComplexDelimiterLevel2() {
-    return complexDelimiterLevel2;
-  }
-
-  public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
-    this.complexDelimiterLevel2 = complexDelimiterLevel2;
-  }
-
-  public boolean isDirectLoad() {
-    return isDirectLoad;
-  }
-
-  public void setDirectLoad(boolean isDirectLoad) {
-    this.isDirectLoad = isDirectLoad;
-  }
-
-  public String getAllDictPath() {
-    return allDictPath;
-  }
-
-  public void setAllDictPath(String allDictPath) {
-    this.allDictPath = allDictPath;
-  }
-
-  public String getCsvHeader() {
-    return csvHeader;
-  }
-
-  public void setCsvHeader(String csvHeader) {
-    this.csvHeader = csvHeader;
-  }
-
-  public String[] getCsvHeaderColumns() {
-    return csvHeaderColumns;
-  }
-
-  public void setCsvHeaderColumns(String[] csvHeaderColumns) {
-    this.csvHeaderColumns = csvHeaderColumns;
-  }
-
-  public void initPredefDictMap() {
-    predefDictMap = new HashMap<>();
-  }
-
-  public String getPredefDictFilePath(CarbonDimension dimension) {
-    return predefDictMap.get(dimension);
-  }
-
-  public void setPredefDictMap(CarbonDimension dimension, String predefDictFilePath) {
-    this.predefDictMap.put(dimension, predefDictFilePath);
-  }
-
-  /**
-   * @return carbon dataload schema
-   */
-  public CarbonDataLoadSchema getCarbonDataLoadSchema() {
-    return carbonDataLoadSchema;
-  }
-
-  /**
-   * @param carbonDataLoadSchema
-   */
-  public void setCarbonDataLoadSchema(CarbonDataLoadSchema carbonDataLoadSchema) {
-    this.carbonDataLoadSchema = carbonDataLoadSchema;
-  }
-
-  /**
-   * @return the databaseName
-   */
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  /**
-   * @param databaseName the databaseName to set
-   */
-  public void setDatabaseName(String databaseName) {
-    this.databaseName = databaseName;
-  }
-
-  /**
-   * @return the tableName
-   */
-  public String getTableName() {
-    return tableName;
-  }
-
-  /**
-   * @param tableName the tableName to set
-   */
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  /**
-   * @return the factFilePath
-   */
-  public String getFactFilePath() {
-    return factFilePath;
-  }
-
-  /**
-   * @param factFilePath the factFilePath to set
-   */
-  public void setFactFilePath(String factFilePath) {
-    this.factFilePath = factFilePath;
-  }
-
-  /**
-   * @return external column dictionary file path
-   */
-  public String getColDictFilePath() {
-    return colDictFilePath;
-  }
-
-  /**
-   * set external column dictionary file path
-   *
-   * @param colDictFilePath
-   */
-  public void setColDictFilePath(String colDictFilePath) {
-    this.colDictFilePath = colDictFilePath;
-  }
-
-  /**
-   * get copy with partition
-   *
-   * @param uniqueId
-   * @return
-   */
-  public CarbonLoadModel getCopyWithPartition(String uniqueId) {
-    CarbonLoadModel copy = new CarbonLoadModel();
-    copy.tableName = tableName;
-    copy.factFilePath = factFilePath + '/' + uniqueId;
-    copy.databaseName = databaseName;
-    copy.partitionId = uniqueId;
-    copy.aggLoadRequest = aggLoadRequest;
-    copy.loadMetadataDetails = loadMetadataDetails;
-    copy.isRetentionRequest = isRetentionRequest;
-    copy.complexDelimiterLevel1 = complexDelimiterLevel1;
-    copy.complexDelimiterLevel2 = complexDelimiterLevel2;
-    copy.carbonDataLoadSchema = carbonDataLoadSchema;
-    copy.blocksID = blocksID;
-    copy.taskNo = taskNo;
-    copy.factTimeStamp = factTimeStamp;
-    copy.segmentId = segmentId;
-    copy.serializationNullFormat = serializationNullFormat;
-    copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
-    copy.badRecordsAction = badRecordsAction;
-    copy.escapeChar = escapeChar;
-    copy.quoteChar = quoteChar;
-    copy.commentChar = commentChar;
-    copy.dateFormat = dateFormat;
-    copy.defaultTimestampFormat = defaultTimestampFormat;
-    copy.maxColumns = maxColumns;
-    copy.storePath = storePath;
-    copy.useOnePass = useOnePass;
-    copy.dictionaryServerHost = dictionaryServerHost;
-    copy.dictionaryServerPort = dictionaryServerPort;
-    copy.preFetch = preFetch;
-    copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
-    copy.sortScope = sortScope;
-    copy.batchSortSizeInMb = batchSortSizeInMb;
-    copy.badRecordsLocation = badRecordsLocation;
-    return copy;
-  }
-
-  /**
-   * Get copy with taskNo.
-   * Broadcast value is shared in process, so we need to copy it to make sure the value in each
-   * task independently.
-   *
-   * @return
-   */
-  public CarbonLoadModel getCopyWithTaskNo(String taskNo) {
-    CarbonLoadModel copy = new CarbonLoadModel();
-    copy.tableName = tableName;
-    copy.factFilePath = factFilePath;
-    copy.databaseName = databaseName;
-    copy.partitionId = partitionId;
-    copy.aggLoadRequest = aggLoadRequest;
-    copy.loadMetadataDetails = loadMetadataDetails;
-    copy.isRetentionRequest = isRetentionRequest;
-    copy.csvHeader = csvHeader;
-    copy.csvHeaderColumns = csvHeaderColumns;
-    copy.isDirectLoad = isDirectLoad;
-    copy.csvDelimiter = csvDelimiter;
-    copy.complexDelimiterLevel1 = complexDelimiterLevel1;
-    copy.complexDelimiterLevel2 = complexDelimiterLevel2;
-    copy.carbonDataLoadSchema = carbonDataLoadSchema;
-    copy.blocksID = blocksID;
-    copy.taskNo = taskNo;
-    copy.factTimeStamp = factTimeStamp;
-    copy.segmentId = segmentId;
-    copy.serializationNullFormat = serializationNullFormat;
-    copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
-    copy.badRecordsAction = badRecordsAction;
-    copy.escapeChar = escapeChar;
-    copy.quoteChar = quoteChar;
-    copy.commentChar = commentChar;
-    copy.dateFormat = dateFormat;
-    copy.defaultTimestampFormat = defaultTimestampFormat;
-    copy.maxColumns = maxColumns;
-    copy.storePath = storePath;
-    copy.useOnePass = useOnePass;
-    copy.dictionaryServerHost = dictionaryServerHost;
-    copy.dictionaryServerPort = dictionaryServerPort;
-    copy.preFetch = preFetch;
-    copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
-    copy.sortScope = sortScope;
-    copy.batchSortSizeInMb = batchSortSizeInMb;
-    return copy;
-  }
-
-  /**
-   * get CarbonLoadModel with partition
-   *
-   * @param uniqueId
-   * @param filesForPartition
-   * @param header
-   * @param delimiter
-   * @return
-   */
-  public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
-      String header, String delimiter) {
-    CarbonLoadModel copyObj = new CarbonLoadModel();
-    copyObj.tableName = tableName;
-    copyObj.factFilePath = null;
-    copyObj.databaseName = databaseName;
-    copyObj.partitionId = uniqueId;
-    copyObj.aggLoadRequest = aggLoadRequest;
-    copyObj.loadMetadataDetails = loadMetadataDetails;
-    copyObj.isRetentionRequest = isRetentionRequest;
-    copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
-    copyObj.csvHeader = header;
-    copyObj.csvHeaderColumns = csvHeaderColumns;
-    copyObj.isDirectLoad = true;
-    copyObj.csvDelimiter = delimiter;
-    copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
-    copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;
-    copyObj.blocksID = blocksID;
-    copyObj.taskNo = taskNo;
-    copyObj.factTimeStamp = factTimeStamp;
-    copyObj.segmentId = segmentId;
-    copyObj.serializationNullFormat = serializationNullFormat;
-    copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
-    copyObj.badRecordsAction = badRecordsAction;
-    copyObj.escapeChar = escapeChar;
-    copyObj.quoteChar = quoteChar;
-    copyObj.commentChar = commentChar;
-    copyObj.dateFormat = dateFormat;
-    copyObj.defaultTimestampFormat = defaultTimestampFormat;
-    copyObj.maxColumns = maxColumns;
-    copyObj.storePath = storePath;
-    copyObj.useOnePass = useOnePass;
-    copyObj.dictionaryServerHost = dictionaryServerHost;
-    copyObj.dictionaryServerPort = dictionaryServerPort;
-    copyObj.preFetch = preFetch;
-    copyObj.isEmptyDataBadRecord = isEmptyDataBadRecord;
-    copyObj.sortScope = sortScope;
-    copyObj.batchSortSizeInMb = batchSortSizeInMb;
-    copyObj.badRecordsLocation = badRecordsLocation;
-    return copyObj;
-  }
-
-  /**
-   * @return the partitionId
-   */
-  public String getPartitionId() {
-    return partitionId;
-  }
-
-  /**
-   * @param partitionId the partitionId to set
-   */
-  public void setPartitionId(String partitionId) {
-    this.partitionId = partitionId;
-  }
-
-  /**
-   * @param storePath The storePath to set.
-   */
-  public void setStorePath(String storePath) {
-    this.storePath = storePath;
-  }
-
-  /**
-   * @return Returns the factStoreLocation.
-   */
-  public String getStorePath() {
-    return storePath;
-  }
-
-  /**
-   * isRetentionRequest
-   *
-   * @return
-   */
-  public boolean isRetentionRequest() {
-    return isRetentionRequest;
-  }
-
-  /**
-   * getLoadMetadataDetails.
-   *
-   * @return
-   */
-  public List<LoadMetadataDetails> getLoadMetadataDetails() {
-    return loadMetadataDetails;
-  }
-
-  /**
-   * setLoadMetadataDetails.
-   *
-   * @param loadMetadataDetails
-   */
-  public void setLoadMetadataDetails(List<LoadMetadataDetails> loadMetadataDetails) {
-    this.loadMetadataDetails = loadMetadataDetails;
-  }
-
-  /**
-   * getSegmentUpdateStatusManager
-   *
-   * @return
-   */
-  public SegmentUpdateStatusManager getSegmentUpdateStatusManager() {
-    return segmentUpdateStatusManager;
-  }
-
-  /**
-   * setSegmentUpdateStatusManager
-   *
-   * @param segmentUpdateStatusManager
-   */
-  public void setSegmentUpdateStatusManager(SegmentUpdateStatusManager segmentUpdateStatusManager) {
-    this.segmentUpdateStatusManager = segmentUpdateStatusManager;
-  }
-
-  /**
-   * @return
-   */
-  public String getTaskNo() {
-    return taskNo;
-  }
-
-  /**
-   * @param taskNo
-   */
-  public void setTaskNo(String taskNo) {
-    this.taskNo = taskNo;
-  }
-
-  /**
-   * @return
-   */
-  public long getFactTimeStamp() {
-    return factTimeStamp;
-  }
-
-  /**
-   * @param factTimeStamp
-   */
-  public void setFactTimeStamp(long factTimeStamp) {
-    this.factTimeStamp = factTimeStamp;
-  }
-
-  public String[] getDelimiters() {
-    return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 };
-  }
-
-  /**
-   * @return load Id
-   */
-  public String getSegmentId() {
-    return segmentId;
-  }
-
-  /**
-   * @param segmentId
-   */
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
-  }
-
-  /**
-   * the method returns the value to be treated as null while data load
-   *
-   * @return
-   */
-  public String getSerializationNullFormat() {
-    return serializationNullFormat;
-  }
-
-  /**
-   * the method sets the value to be treated as null while data load
-   *
-   * @param serializationNullFormat
-   */
-  public void setSerializationNullFormat(String serializationNullFormat) {
-    this.serializationNullFormat = serializationNullFormat;
-  }
-
-  /**
-   * returns the string to enable bad record logger
-   *
-   * @return
-   */
-  public String getBadRecordsLoggerEnable() {
-    return badRecordsLoggerEnable;
-  }
-
-  /**
-   * method sets the string to specify whether to enable or dissable the badrecord logger.
-   *
-   * @param badRecordsLoggerEnable
-   */
-  public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
-    this.badRecordsLoggerEnable = badRecordsLoggerEnable;
-  }
-
-  public String getQuoteChar() {
-    return quoteChar;
-  }
-
-  public void setQuoteChar(String quoteChar) {
-    this.quoteChar = quoteChar;
-  }
-
-  public String getCommentChar() {
-    return commentChar;
-  }
-
-  public void setCommentChar(String commentChar) {
-    this.commentChar = commentChar;
-  }
-
-  public String getDateFormat() {
-    return dateFormat;
-  }
-
-  public void setDateFormat(String dateFormat) {
-    this.dateFormat = dateFormat;
-  }
-
-  public String getDefaultTimestampFormat() {
-    return defaultTimestampFormat;
-  }
-
-  public void setDefaultTimestampFormat(String defaultTimestampFormat) {
-    this.defaultTimestampFormat = defaultTimestampFormat;
-  }
-
-  /**
-   * @return
-   */
-  public String getMaxColumns() {
-    return maxColumns;
-  }
-
-  /**
-   * @param maxColumns
-   */
-  public void setMaxColumns(String maxColumns) {
-    this.maxColumns = maxColumns;
-  }
-
-  /**
-   * returns option to specify the bad record logger action
-   *
-   * @return
-   */
-  public String getBadRecordsAction() {
-    return badRecordsAction;
-  }
-
-  /**
-   * set option to specify the bad record logger action
-   *
-   * @param badRecordsAction
-   */
-  public void setBadRecordsAction(String badRecordsAction) {
-    this.badRecordsAction = badRecordsAction;
-  }
-
-  public boolean getUseOnePass() {
-    return useOnePass;
-  }
-
-  public void setUseOnePass(boolean useOnePass) {
-    this.useOnePass = useOnePass;
-  }
-
-  public int getDictionaryServerPort() {
-    return dictionaryServerPort;
-  }
-
-  public void setDictionaryServerPort(int dictionaryServerPort) {
-    this.dictionaryServerPort = dictionaryServerPort;
-  }
-
-  public String getDictionaryServerHost() {
-    return dictionaryServerHost;
-  }
-
-  public void setDictionaryServerHost(String dictionaryServerHost) {
-    this.dictionaryServerHost = dictionaryServerHost;
-  }
-
-  public boolean isPreFetch() {
-    return preFetch;
-  }
-
-  public void setPreFetch(boolean preFetch) {
-    this.preFetch = preFetch;
-  }
-
-  public String getDefaultDateFormat() {
-    return defaultDateFormat;
-  }
-
-  public void setDefaultDateFormat(String defaultDateFormat) {
-    this.defaultDateFormat = defaultDateFormat;
-  }
-
-  public String getIsEmptyDataBadRecord() {
-    return isEmptyDataBadRecord;
-  }
-
-  public void setIsEmptyDataBadRecord(String isEmptyDataBadRecord) {
-    this.isEmptyDataBadRecord = isEmptyDataBadRecord;
-  }
-
-  public String getSortScope() {
-    return sortScope;
-  }
-
-  public void setSortScope(String sortScope) {
-    this.sortScope = sortScope;
-  }
-
-  public String getBatchSortSizeInMb() {
-    return batchSortSizeInMb;
-  }
-
-  public void setBatchSortSizeInMb(String batchSortSizeInMb) {
-    this.batchSortSizeInMb = batchSortSizeInMb;
-  }
-
-  public String getGlobalSortPartitions() {
-    return globalSortPartitions;
-  }
-
-  public void setGlobalSortPartitions(String globalSortPartitions) {
-    this.globalSortPartitions = globalSortPartitions;
-  }
-
-  public String getBadRecordsLocation() {
-    return badRecordsLocation;
-  }
-
-  public void setBadRecordsLocation(String badRecordsLocation) {
-    this.badRecordsLocation = badRecordsLocation;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
deleted file mode 100644
index 9e0aa02..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-
-/**
- * This base abstract class for data loading.
- * It can do transformation jobs as per the implementation.
- *
- * Life cycle of this class is
- * First initialize() is called to initialize the step
- * then execute() is called to process the step logic and
- * then close() is called to close any resources if any opened in the step.
- */
-public abstract class AbstractDataLoadProcessorStep {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(AbstractDataLoadProcessorStep.class.getName());
-
-  protected CarbonDataLoadConfiguration configuration;
-
-  protected AbstractDataLoadProcessorStep child;
-
-  protected AtomicLong rowCounter;
-
-  protected boolean closed;
-
-  public AbstractDataLoadProcessorStep(CarbonDataLoadConfiguration configuration,
-      AbstractDataLoadProcessorStep child) {
-    this.configuration = configuration;
-    this.child = child;
-    this.rowCounter = new AtomicLong();
-    this.closed = false;
-  }
-
-  /**
-   * The output meta for this step. The data returns from this step is as per this meta.
-   *
-   */
-  public abstract DataField[] getOutput();
-
-  /**
-   * Initialization process for this step.
-   *
-   * @throws IOException
-   */
-  public void initialize() throws IOException {
-    if (LOGGER.isInfoEnabled()) {
-      // This thread prints the rows processed in each step for every 10 seconds.
-      new Thread() {
-        @Override public void run() {
-          while (!closed) {
-            try {
-              LOGGER.info("Rows processed in step " + getStepName() + " : " + rowCounter.get());
-              Thread.sleep(10000);
-            } catch (InterruptedException e) {
-              //ignore
-              LOGGER.error(e.getMessage());
-            }
-          }
-        }
-      }.start();
-    }
-  }
-
-  /**
-   * Tranform the data as per the implementation.
-   *
-   * @return Array of Iterator with data. It can be processed parallel if implementation class wants
-   * @throws CarbonDataLoadingException
-   */
-  public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
-    Iterator<CarbonRowBatch>[] childIters = child.execute();
-    Iterator<CarbonRowBatch>[] iterators = new Iterator[childIters.length];
-    for (int i = 0; i < childIters.length; i++) {
-      iterators[i] = getIterator(childIters[i]);
-    }
-    return iterators;
-  }
-
-  /**
-   * Create the iterator using child iterator.
-   *
-   * @param childIter
-   * @return new iterator with step specific processing.
-   */
-  protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
-    return new CarbonIterator<CarbonRowBatch>() {
-      @Override public boolean hasNext() {
-        return childIter.hasNext();
-      }
-
-      @Override public CarbonRowBatch next() {
-        return processRowBatch(childIter.next());
-      }
-    };
-  }
-
-  /**
-   * Process the batch of rows as per the step logic.
-   *
-   * @param rowBatch
-   * @return processed row.
-   */
-  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
-    CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
-    while (rowBatch.hasNext()) {
-      newBatch.addRow(processRow(rowBatch.next()));
-    }
-    return newBatch;
-  }
-
-  /**
-   * Process the row as per the step logic.
-   *
-   * @param row
-   * @return processed row.
-   */
-  protected abstract CarbonRow processRow(CarbonRow row);
-
-  /**
-   * Get the step name for logging purpose.
-   * @return Step name
-   */
-  protected abstract String getStepName();
-
-
-  /**
-   * Close all resources.This method is called after execute() is finished.
-   * It will be called in both success and failure cases.
-   */
-  public void close() {
-    if (!closed) {
-      closed = true;
-      LOGGER.info("Total rows processed in step " + this.getStepName() + ": " + rowCounter.get());
-      if (child != null) {
-        child.close();
-      }
-    }
-  }
-
-}