You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:26 UTC

[39/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index 9cf8a91..10e6785 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.spark.load.FailureCauses
+import org.apache.carbondata.processing.loading.FailureCauses
 
 /**
  * IUD update delete and compaction framework.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index 036ca49..5e9d31f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, Lock
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.spark.load.FailureCauses
+import org.apache.carbondata.processing.loading.FailureCauses
 
 private[sql] case class ProjectForUpdateCommand(
     plan: LogicalPlan, tableIdentifier: Seq[String])

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
index 1f06aed..e0b891a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableDropCarbonPartitionCommand.scala
@@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 
 case class AlterTableDropCarbonPartitionCommand(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
index 21b974a..e16dfc9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/AlterTableSplitCarbonPartitionCommand.scala
@@ -39,7 +39,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 61589de..5c7d451 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -25,8 +25,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
 
 /**
   * Test Case for org.apache.carbondata.integration.spark.util.GlobalDictionaryUtil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index e7eb422..fd3b2cd 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentif
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 /**
  * Utility for global dictionary test cases

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 4746ecf..399665f 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -27,9 +27,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.constants.TableOptionConstant
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java
deleted file mode 100644
index 890534e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/DataLoadModel.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.api.dataloader;
-
-public class DataLoadModel {
-  /**
-   * Schema Info
-   */
-  private SchemaInfo schemaInfo;
-
-  /**
-   * table table
-   */
-  private String tableName;
-
-  /**
-   * is CSV load
-   */
-  private boolean isCsvLoad;
-
-  private String blocksID;
-  /**
-   * task id, each spark task has a unique id
-   */
-  private String taskNo;
-  /**
-   * new load start time
-   */
-  private String factTimeStamp;
-
-  private String escapeCharacter;
-
-  private String quoteCharacter;
-
-  private String commentCharacter;
-
-  private String rddIteratorKey;
-
-  private String dateFormat;
-
-  private String maxColumns;
-  /**
-   * @return Returns the schemaInfo.
-   */
-  public SchemaInfo getSchemaInfo() {
-    return schemaInfo;
-  }
-
-  /**
-   * @param schemaInfo The schemaInfo to set.
-   */
-  public void setSchemaInfo(SchemaInfo schemaInfo) {
-    this.schemaInfo = schemaInfo;
-  }
-
-  /**
-   * @return Returns the tableName.
-   */
-  public String getTableName() {
-    return tableName;
-  }
-
-  /**
-   * @param tableName The tableName to set.
-   */
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  /**
-   * @return Returns the isCsvLoad.
-   */
-  public boolean isCsvLoad() {
-    return isCsvLoad;
-  }
-
-  /**
-   * @param isCsvLoad The isCsvLoad to set.
-   */
-  public void setCsvLoad(boolean isCsvLoad) {
-    this.isCsvLoad = isCsvLoad;
-  }
-
-  /**
-   * get block id
-   *
-   * @return
-   */
-  public String getBlocksID() {
-    return blocksID;
-  }
-
-  /**
-   * set block id to data load model
-   *
-   * @param blocksID
-   */
-  public void setBlocksID(String blocksID) {
-    this.blocksID = blocksID;
-  }
-
-  /**
-   * @return
-   */
-  public String getTaskNo() {
-    return taskNo;
-  }
-
-  /**
-   * @param taskNo
-   */
-  public void setTaskNo(String taskNo) {
-    this.taskNo = taskNo;
-  }
-
-  /**
-   * @return
-   */
-  public String getFactTimeStamp() {
-    return factTimeStamp;
-  }
-
-  /**
-   * @param factTimeStamp
-   */
-  public void setFactTimeStamp(String factTimeStamp) {
-    this.factTimeStamp = factTimeStamp;
-  }
-
-  public String getEscapeCharacter() {
-    return escapeCharacter;
-  }
-
-  public void setEscapeCharacter(String escapeCharacter) {
-    this.escapeCharacter = escapeCharacter;
-  }
-
-  public String getQuoteCharacter() { return quoteCharacter; }
-
-  public void setQuoteCharacter(String quoteCharacter) { this.quoteCharacter = quoteCharacter; }
-
-  public String getCommentCharacter() { return commentCharacter; }
-
-  public void setCommentCharacter(String commentCharacter) {
-    this.commentCharacter = commentCharacter;
-  }
-
-  public String getDateFormat() { return dateFormat; }
-
-  public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; }
-  /**
-   * @return
-   */
-  public String getMaxColumns() {
-    return maxColumns;
-  }
-
-  /**
-   * @param maxColumns
-   */
-  public void setMaxColumns(String maxColumns) {
-    this.maxColumns = maxColumns;
-  }
-
-  public String getRddIteratorKey() {
-    return rddIteratorKey;
-  }
-
-  public void setRddIteratorKey(String rddIteratorKey) {
-    this.rddIteratorKey = rddIteratorKey;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java b/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
deleted file mode 100644
index 88c4879..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/api/dataloader/SchemaInfo.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.api.dataloader;
-
-public class SchemaInfo {
-
-  /**
-   * databaseName
-   */
-  private String databaseName;
-
-  /**
-   * tableName
-   */
-  private String tableName;
-
-  /**
-   * isAutoAggregateRequest
-   */
-  private boolean isAutoAggregateRequest;
-
-  private String complexDelimiterLevel1;
-
-  private String complexDelimiterLevel2;
-  /**
-   * the value to be treated as null while data load
-   */
-  private String serializationNullFormat;
-
-  /**
-   * defines the string to specify whether the bad record logger should be enabled or not
-   */
-  private String badRecordsLoggerEnable;
-  /**
-   * defines the option to specify whether to bad record logger action
-   */
-  private String badRecordsLoggerAction;
-
-
-  public String getComplexDelimiterLevel1() {
-    return complexDelimiterLevel1;
-  }
-
-  public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
-    this.complexDelimiterLevel1 = complexDelimiterLevel1;
-  }
-
-  public String getComplexDelimiterLevel2() {
-    return complexDelimiterLevel2;
-  }
-
-  public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
-    this.complexDelimiterLevel2 = complexDelimiterLevel2;
-  }
-
-  public String getTableName() {
-    return tableName;
-  }
-
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  /**
-   * @return the isAutoAggregateRequest
-   */
-  public boolean isAutoAggregateRequest() {
-    return isAutoAggregateRequest;
-  }
-
-  /**
-   * @param isAutoAggregateRequest the isAutoAggregateRequest to set
-   */
-  public void setAutoAggregateRequest(boolean isAutoAggregateRequest) {
-    this.isAutoAggregateRequest = isAutoAggregateRequest;
-  }
-
-  /**
-   * @return the databaseName
-   */
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  /**
-   * @param databaseName the databaseName to set
-   */
-  public void setDatabaseName(String databaseName) {
-    this.databaseName = databaseName;
-  }
-
-  /**
-   * the method returns the value to be treated as null while data load
-   * @return
-   */
-  public String getSerializationNullFormat() {
-    return serializationNullFormat;
-  }
-
-  /**
-   * the method sets the value to be treated as null while data load
-   * @param serializationNullFormat
-   */
-  public void setSerializationNullFormat(String serializationNullFormat) {
-    this.serializationNullFormat = serializationNullFormat;
-  }
-
-  /**
-   * returns the string to enable bad record logger
-   * @return
-   */
-  public String getBadRecordsLoggerEnable() {
-    return badRecordsLoggerEnable;
-  }
-
-  /**
-   * method sets the string to specify whether to enable or dissable the badrecord logger.
-   * @param badRecordsLoggerEnable
-   */
-  public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
-    this.badRecordsLoggerEnable = badRecordsLoggerEnable;
-  }
-
-  /**
-   * returns the option to set bad record logger action
-   * @return
-   */
-  public String getBadRecordsLoggerAction() {
-    return badRecordsLoggerAction;
-  }
-
-  /**
-   * set the option to set set bad record logger action
-   * @param badRecordsLoggerAction
-   */
-  public void setBadRecordsLoggerAction(String badRecordsLoggerAction) {
-    this.badRecordsLoggerAction = badRecordsLoggerAction;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
deleted file mode 100644
index 05f561f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/constants/DataProcessorConstants.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.constants;
-
-public final class DataProcessorConstants {
-  /**
-   *
-   */
-  public static final String CSV_DATALOADER = "CSV_DATALOADER";
-  /**
-   *
-   */
-  public static final String DATARESTRUCT = "DATARESTRUCT";
-  /**
-   * UPDATEMEMBER
-   */
-  public static final String UPDATEMEMBER = "UPDATEMEMBER";
-  /**
-   * number of days task should be in DB table
-   */
-  public static final String TASK_RETENTION_DAYS = "dataload.taskstatus.retention";
-  /**
-   * LOAD_FOLDER
-   */
-  public static final String LOAD_FOLDER = "Load_";
-  /**
-   * if bad record found
-   */
-  public static final long BAD_REC_FOUND = 223732673;
-  /**
-   * if bad record found
-   */
-  public static final long CSV_VALIDATION_ERRROR_CODE = 113732678;
-  /**
-   * Year Member val for data retention.
-   */
-  public static final String YEAR = "YEAR";
-
-  /**
-   * if data load fails due to bad record
-   */
-  public static final long BAD_REC_FAILURE_ERROR_CODE = 223732674;
-
-  private DataProcessorConstants() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java b/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java
deleted file mode 100644
index 3917974..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/constants/TableOptionConstant.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.constants;
-
-/**
- * enum holds the value related to the ddl option
- */
-public enum TableOptionConstant {
-  SERIALIZATION_NULL_FORMAT("serialization_null_format"),
-  BAD_RECORDS_LOGGER_ENABLE("bad_records_logger_enable"),
-  BAD_RECORDS_ACTION("bad_records_action");
-
-  private String name;
-
-  /**
-   * constructor to initialize the enum value
-   * @param name
-   */
-  TableOptionConstant(String name) {
-    this.name = name;
-  }
-
-  public String getName() {
-    return name;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
deleted file mode 100644
index d6d214b..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.csvload;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-/**
- * blocks info
- */
-public class BlockDetails extends FileSplit implements Serializable {
-
-  /**
-   * serialization version
-   */
-  private static final long serialVersionUID = 2293906691860002339L;
-  //block offset
-  private long blockOffset;
-  //block length
-  private long blockLength;
-  //file path which block belong to
-  private String filePath;
-  // locations where this block exists
-  private String[] locations;
-
-  public BlockDetails(Path filePath, long blockOffset, long blockLength, String[] locations) {
-    super(filePath, blockOffset, blockLength, locations);
-    this.filePath = filePath.toString();
-    this.blockOffset = blockOffset;
-    this.blockLength = blockLength;
-    this.locations = locations;
-  }
-
-  public long getBlockOffset() {
-    return blockOffset;
-  }
-
-  public long getBlockLength() {
-    return blockLength;
-  }
-
-  public String getFilePath() {
-    return FileFactory.getUpdatedFilePath(filePath);
-  }
-
-  public void setFilePath(String filePath) {
-    this.filePath = filePath;
-  }
-
-  public String[] getLocations() {
-    return locations;
-  }
-
-  /** The file containing this split's data. */
-  @Override
-  public Path getPath() { return new Path(filePath); }
-
-  /** The position of the first byte in the file to process. */
-  @Override
-  public long getStart() { return blockOffset; }
-
-  /** The number of bytes in the file to process. */
-  @Override
-  public long getLength() { return blockLength; }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
deleted file mode 100644
index 9f80c07..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.csvload;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Customarized reader class to read data from file
- * untill the upper threshold reached.
- */
-public class BoundedInputStream extends InputStream {
-
-  /**
-   * byte value of the new line character
-   */
-  private static final byte END_OF_LINE_BYTE_VALUE = '\n';
-
-  /**
-   * number of extra character to read
-   */
-  private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
-
-  /**
-   * number of bytes remaining
-   */
-  private long remaining;
-  /**
-   * to check whether end of line is found
-   */
-  private boolean endOfLineFound = false;
-
-  private DataInputStream in;
-
-  public BoundedInputStream(DataInputStream in, long limit) {
-    this.in = in;
-    this.remaining = limit;
-  }
-
-  /**
-   * Below method will be used to read the data from file
-   *
-   * @throws IOException
-   *           problem while reading
-   */
-  @Override
-  public int read() throws IOException {
-    if (this.remaining == 0) {
-      return -1;
-    } else {
-      int var1 = this.in.read();
-      if (var1 >= 0) {
-        --this.remaining;
-      }
-
-      return var1;
-    }
-  }
-
-  /**
-   * Below method will be used to read the data from file. If limit reaches in
-   * that case it will read until new line character is reached
-   *
-   * @param buffer
-   *          buffer in which data will be read
-   * @param offset
-   *          from position to buffer will be filled
-   * @param length
-   *          number of character to be read
-   * @throws IOException
-   *           problem while reading
-   */
-  @Override
-  public int read(byte[] buffer, int offset, int length) throws IOException {
-    if (this.remaining == 0) {
-      return -1;
-    } else {
-      if (this.remaining < length) {
-        length = (int) this.remaining;
-      }
-
-      length = this.in.read(buffer, offset, length);
-      if (length >= 0) {
-        this.remaining -= length;
-        if (this.remaining == 0 && !endOfLineFound) {
-          endOfLineFound = true;
-          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
-        } else if (endOfLineFound) {
-          int end = offset + length;
-          for (int i = offset; i < end; i++) {
-            if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
-              this.remaining = 0;
-              return (i - offset) + 1;
-            }
-          }
-          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
-        }
-      }
-      return length;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (in != null) {
-      in.close();
-    }
-  }
-
-  public long getRemaining() {
-    return  this.remaining;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
deleted file mode 100644
index c793126..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.csvload;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.nio.charset.Charset;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-import com.univocity.parsers.csv.CsvParser;
-import com.univocity.parsers.csv.CsvParserSettings;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.util.LineReader;
-
-/**
- * An {@link org.apache.hadoop.mapreduce.InputFormat} for csv files.  Files are broken into lines.
- * Values are the line of csv files.
- */
-public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWritable> {
-
-  public static final String DELIMITER = "carbon.csvinputformat.delimiter";
-  public static final String DELIMITER_DEFAULT = ",";
-  public static final String COMMENT = "carbon.csvinputformat.comment";
-  public static final String COMMENT_DEFAULT = "#";
-  public static final String QUOTE = "carbon.csvinputformat.quote";
-  public static final String QUOTE_DEFAULT = "\"";
-  public static final String ESCAPE = "carbon.csvinputformat.escape";
-  public static final String ESCAPE_DEFAULT = "\\";
-  public static final String HEADER_PRESENT = "caron.csvinputformat.header.present";
-  public static final boolean HEADER_PRESENT_DEFAULT = false;
-  public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size";
-  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
-  public static final String MAX_COLUMNS = "carbon.csvinputformat.max.columns";
-  public static final String NUMBER_OF_COLUMNS = "carbon.csvinputformat.number.of.columns";
-  public static final int DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 2000;
-  public static final int THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING = 20000;
-
-  private static LogService LOGGER =
-      LogServiceFactory.getLogService(CSVInputFormat.class.toString());
-
-
-  @Override
-  public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit inputSplit,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-    return new CSVRecordReader();
-  }
-
-  @Override
-  protected boolean isSplitable(JobContext context, Path file) {
-    final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration())
-        .getCodec(file);
-    if (null == codec) {
-      return true;
-    }
-    return codec instanceof SplittableCompressionCodec;
-  }
-
-  /**
-   * Sets the comment char to configuration. Default it is #.
-   * @param configuration
-   * @param commentChar
-   */
-  public static void setCommentCharacter(Configuration configuration, String commentChar) {
-    if (commentChar != null && !commentChar.isEmpty()) {
-      configuration.set(COMMENT, commentChar);
-    }
-  }
-
-  /**
-   * Sets the delimiter to configuration. Default it is ','
-   * @param configuration
-   * @param delimiter
-   */
-  public static void setCSVDelimiter(Configuration configuration, String delimiter) {
-    if (delimiter != null && !delimiter.isEmpty()) {
-      configuration.set(DELIMITER, delimiter);
-    }
-  }
-
-  /**
-   * Sets the escape character to configuration. Default it is \
-   * @param configuration
-   * @param escapeCharacter
-   */
-  public static void setEscapeCharacter(Configuration configuration, String escapeCharacter) {
-    if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
-      configuration.set(ESCAPE, escapeCharacter);
-    }
-  }
-
-  /**
-   * Whether header needs to read from csv or not. By default it is false.
-   * @param configuration
-   * @param headerExtractEnable
-   */
-  public static void setHeaderExtractionEnabled(Configuration configuration,
-      boolean headerExtractEnable) {
-    configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
-  }
-
-  /**
-   * Sets the quote character to configuration. Default it is "
-   * @param configuration
-   * @param quoteCharacter
-   */
-  public static void setQuoteCharacter(Configuration configuration, String quoteCharacter) {
-    if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
-      configuration.set(QUOTE, quoteCharacter);
-    }
-  }
-
-  /**
-   * Sets the read buffer size to configuration.
-   * @param configuration
-   * @param bufferSize
-   */
-  public static void setReadBufferSize(Configuration configuration, String bufferSize) {
-    if (bufferSize != null && !bufferSize.isEmpty()) {
-      configuration.set(READ_BUFFER_SIZE, bufferSize);
-    }
-  }
-
-  public static void setMaxColumns(Configuration configuration, String maxColumns) {
-    if (maxColumns != null) {
-      configuration.set(MAX_COLUMNS, maxColumns);
-    }
-  }
-
-  public static void setNumberOfColumns(Configuration configuration, String numberOfColumns) {
-    configuration.set(NUMBER_OF_COLUMNS, numberOfColumns);
-  }
-
-  /**
-   * Treats value as line in file. Key is null.
-   */
-  public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> {
-
-    private long start;
-    private long end;
-    private BoundedInputStream boundedInputStream;
-    private Reader reader;
-    private CsvParser csvParser;
-    private StringArrayWritable value;
-    private String[] columns;
-    private Seekable filePosition;
-    private boolean isCompressedInput;
-    private Decompressor decompressor;
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-        throws IOException, InterruptedException {
-      FileSplit split = (FileSplit) inputSplit;
-      start = split.getStart();
-      end = start + split.getLength();
-      Path file = split.getPath();
-      Configuration job = context.getConfiguration();
-      CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
-      FileSystem fs = file.getFileSystem(job);
-      int bufferSize = Integer.parseInt(job.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT));
-      FSDataInputStream fileIn = fs.open(file, bufferSize);
-      InputStream inputStream;
-      if (codec != null) {
-        isCompressedInput = true;
-        decompressor = CodecPool.getDecompressor(codec);
-        if (codec instanceof SplittableCompressionCodec) {
-          SplitCompressionInputStream scIn = ((SplittableCompressionCodec) codec)
-              .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec
-                  .READ_MODE.BYBLOCK);
-          start = scIn.getAdjustedStart();
-          end = scIn.getAdjustedEnd();
-          if (start != 0) {
-            LineReader lineReader = new LineReader(scIn, 1);
-            start += lineReader.readLine(new Text(), 0);
-          }
-          filePosition = scIn;
-          inputStream = scIn;
-        } else {
-          CompressionInputStream cIn = codec.createInputStream(fileIn, decompressor);
-          filePosition = cIn;
-          inputStream = cIn;
-        }
-      } else {
-        fileIn.seek(start);
-        if (start != 0) {
-          LineReader lineReader = new LineReader(fileIn, 1);
-          start += lineReader.readLine(new Text(), 0);
-        }
-        boundedInputStream = new BoundedInputStream(fileIn, end - start);
-        filePosition = fileIn;
-        inputStream = boundedInputStream;
-      }
-      reader = new InputStreamReader(inputStream,
-          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET));
-      csvParser = new CsvParser(extractCsvParserSettings(job));
-      csvParser.beginParsing(reader);
-    }
-
-    private CsvParserSettings extractCsvParserSettings(Configuration job) {
-      CsvParserSettings parserSettings = new CsvParserSettings();
-      parserSettings.getFormat().setDelimiter(job.get(DELIMITER, DELIMITER_DEFAULT).charAt(0));
-      parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
-      parserSettings.setLineSeparatorDetectionEnabled(true);
-      parserSettings.setNullValue("");
-      parserSettings.setEmptyValue("");
-      parserSettings.setIgnoreLeadingWhitespaces(false);
-      parserSettings.setIgnoreTrailingWhitespaces(false);
-      parserSettings.setSkipEmptyLines(false);
-      parserSettings.setMaxCharsPerColumn(100000);
-      String maxColumns = job.get(MAX_COLUMNS);
-      parserSettings.setMaxColumns(Integer.parseInt(maxColumns));
-      parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
-      parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
-      if (start == 0) {
-        parserSettings.setHeaderExtractionEnabled(job.getBoolean(HEADER_PRESENT,
-            HEADER_PRESENT_DEFAULT));
-      }
-      return parserSettings;
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      if (csvParser == null) {
-        return false;
-      }
-      columns = csvParser.parseNext();
-      if (columns == null) {
-        value = null;
-        return false;
-      }
-      if (value == null) {
-        value = new StringArrayWritable();
-      }
-      value.set(columns);
-      return true;
-    }
-
-    @Override
-    public NullWritable getCurrentKey() throws IOException, InterruptedException {
-      return NullWritable.get();
-    }
-
-    @Override
-    public StringArrayWritable getCurrentValue() throws IOException, InterruptedException {
-      return value;
-    }
-
-    private long getPos() throws IOException {
-      long retVal = start;
-      if (null != boundedInputStream) {
-        retVal = end - boundedInputStream.getRemaining();
-      } else if (isCompressedInput && null != filePosition) {
-        retVal = filePosition.getPos();
-      }
-      return retVal;
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return start == end ? 0.0F : Math.min(1.0F, (float) (getPos() -
-          start) / (float) (end - start));
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        if (reader != null) {
-          reader.close();
-        }
-        if (boundedInputStream != null) {
-          boundedInputStream.close();
-        }
-        if (null != csvParser) {
-          csvParser.stopParsing();
-        }
-      } finally {
-        reader = null;
-        boundedInputStream = null;
-        csvParser = null;
-        filePosition = null;
-        value = null;
-        if (decompressor != null) {
-          CodecPool.returnDecompressor(decompressor);
-          decompressor = null;
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
deleted file mode 100644
index efe75ef..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.csvload;
-
-import java.io.IOException;
-
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import com.univocity.parsers.common.TextParsingException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * It is wrapper iterator around @{@link RecordReader}.
- */
-public class CSVRecordReaderIterator extends CarbonIterator<Object []> {
-
-  private RecordReader<NullWritable, StringArrayWritable> recordReader;
-
-  /**
-   * It is just a little hack to make recordreader as iterator. Usually we cannot call hasNext
-   * multiple times on record reader as it moves another line. To avoid that situation like hasNext
-   * only tells whether next row is present or not and next will move the pointer to next row after
-   * consuming it.
-   */
-  private boolean isConsumed;
-
-  private InputSplit split;
-
-  private TaskAttemptContext context;
-
-  public CSVRecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader,
-      InputSplit split, TaskAttemptContext context) {
-    this.recordReader = recordReader;
-    this.split = split;
-    this.context = context;
-  }
-
-  @Override
-  public boolean hasNext() {
-    try {
-      if (!isConsumed) {
-        isConsumed = recordReader.nextKeyValue();
-        return isConsumed;
-      }
-      return true;
-    } catch (Exception e) {
-      if (e instanceof TextParsingException) {
-        throw new CarbonDataLoadingException(
-            CarbonDataProcessorUtil.trimErrorMessage(e.getMessage()));
-      }
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  @Override
-  public Object[] next() {
-    try {
-      String[] data = recordReader.getCurrentValue().get();
-      isConsumed = false;
-      return data;
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  @Override
-  public void initialize() {
-    try {
-      recordReader.initialize(split, context);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      recordReader.close();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
deleted file mode 100644
index 7eb3ec9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.csvload;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * A String sequence that is usable as a key or value.
- */
-public class StringArrayWritable implements Writable {
-  private String[] values;
-
-  public String[] toStrings() {
-    return values;
-  }
-
-  public void set(String[] values) {
-    this.values = values;
-  }
-
-  public String[] get() {
-    return values;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int length = in.readInt();
-    values = new String[length];
-    for (int i = 0; i < length; i++) {
-      byte[] b = new byte[in.readInt()];
-      in.readFully(b);
-      values[i] = new String(b, Charset.defaultCharset());
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(values.length);                 // write values
-    for (int i = 0; i < values.length; i++) {
-      byte[] b = values[i].getBytes(Charset.defaultCharset());
-      out.writeInt(b.length);
-      out.write(b);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return Arrays.toString(values);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/manager/CarbonDataProcessorManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/manager/CarbonDataProcessorManager.java b/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/manager/CarbonDataProcessorManager.java
deleted file mode 100644
index 5cf2078..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/dataprocessor/manager/CarbonDataProcessorManager.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.dataprocessor.manager;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
-public final class CarbonDataProcessorManager {
-  /**
-   * instance
-   */
-  private static final CarbonDataProcessorManager INSTANCE = new CarbonDataProcessorManager();
-
-  /**
-   * managerHandlerMap
-   */
-  private Map<String, Object> managerHandlerMap;
-
-  /**
-   * private constructor
-   */
-  private CarbonDataProcessorManager() {
-    managerHandlerMap = new HashMap<String, Object>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  }
-
-  /**
-   * Get instance method will be used to get the class instance
-   *
-   * @return
-   */
-  public static CarbonDataProcessorManager getInstance() {
-    return INSTANCE;
-  }
-
-  /**
-   * Below method will be used to get the lock object for all the data processing request.
-   * form the local map, if empty than it will update the map and return the lock object
-   *
-   * @param key
-   * @return
-   */
-  public synchronized Object getDataProcessingLockObject(String key) {
-    Object object = managerHandlerMap.get(key);
-    if (null == object) {
-      object = new Object();
-      managerHandlerMap.put(key, object);
-    }
-    return object;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 7661577..86a6744 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.processing.newflow.complexobjects.ArrayObject;
+import org.apache.carbondata.processing.loading.complexobjects.ArrayObject;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index a9c2bfe..95d7d2e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -44,9 +44,9 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.processing.newflow.dictionary.DictionaryServerClientDictionary;
-import org.apache.carbondata.processing.newflow.dictionary.DirectDictionary;
-import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
+import org.apache.carbondata.processing.loading.dictionary.DictionaryServerClientDictionary;
+import org.apache.carbondata.processing.loading.dictionary.DirectDictionary;
+import org.apache.carbondata.processing.loading.dictionary.PreCreatedDictionary;
 
 /**
  * Primitive DataType stateless object used in data loading

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
index 68b6911..e90fd4a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java
@@ -26,7 +26,7 @@ import java.util.List;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.processing.newflow.complexobjects.StructObject;
+import org.apache.carbondata.processing.loading.complexobjects.StructObject;
 
 /**
  * Struct DataType stateless object used in data loading

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/etl/DataLoadingException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/etl/DataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/etl/DataLoadingException.java
deleted file mode 100644
index 4cb5961..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/etl/DataLoadingException.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.etl;
-
-public class DataLoadingException extends Exception {
-  private static final long serialVersionUID = 1L;
-
-  private long errorCode = -1;
-
-  public DataLoadingException() {
-    super();
-  }
-
-  public DataLoadingException(long errorCode, String message) {
-    super(message);
-    this.errorCode = errorCode;
-  }
-
-  public DataLoadingException(String message) {
-    super(message);
-  }
-
-  public DataLoadingException(Throwable cause) {
-    super(cause);
-  }
-
-  public DataLoadingException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public long getErrorCode() {
-    return errorCode;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
new file mode 100644
index 0000000..15ff95e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/exception/DataLoadingException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.exception;
+
+public class DataLoadingException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  private long errorCode = -1;
+
+  public DataLoadingException() {
+    super();
+  }
+
+  public DataLoadingException(long errorCode, String message) {
+    super(message);
+    this.errorCode = errorCode;
+  }
+
+  public DataLoadingException(String message) {
+    super(message);
+  }
+
+  public DataLoadingException(Throwable cause) {
+    super(cause);
+  }
+
+  public DataLoadingException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public long getErrorCode() {
+    return errorCode;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java b/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
new file mode 100644
index 0000000..d9640a9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/exception/SliceMergerException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.exception;
+
+import java.util.Locale;
+
+public class SliceMergerException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public SliceMergerException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public SliceMergerException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
new file mode 100644
index 0000000..9f2482b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+
+/**
+ * This base abstract class for data loading.
+ * It can do transformation jobs as per the implementation.
+ *
+ * Life cycle of this class is
+ * First initialize() is called to initialize the step
+ * then execute() is called to process the step logic and
+ * then close() is called to close any resources if any opened in the step.
+ */
+public abstract class AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AbstractDataLoadProcessorStep.class.getName());
+
+  protected CarbonDataLoadConfiguration configuration;
+
+  protected AbstractDataLoadProcessorStep child;
+
+  protected AtomicLong rowCounter;
+
+  protected boolean closed;
+
+  public AbstractDataLoadProcessorStep(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    this.configuration = configuration;
+    this.child = child;
+    this.rowCounter = new AtomicLong();
+    this.closed = false;
+  }
+
+  /**
+   * The output meta for this step. The data returns from this step is as per this meta.
+   *
+   */
+  public abstract DataField[] getOutput();
+
+  /**
+   * Initialization process for this step.
+   *
+   * @throws IOException
+   */
+  public void initialize() throws IOException {
+    if (LOGGER.isInfoEnabled()) {
+      // This thread prints the rows processed in each step for every 10 seconds.
+      new Thread() {
+        @Override public void run() {
+          while (!closed) {
+            try {
+              LOGGER.info("Rows processed in step " + getStepName() + " : " + rowCounter.get());
+              Thread.sleep(10000);
+            } catch (InterruptedException e) {
+              //ignore
+              LOGGER.error(e.getMessage());
+            }
+          }
+        }
+      }.start();
+    }
+  }
+
+  /**
+   * Tranform the data as per the implementation.
+   *
+   * @return Array of Iterator with data. It can be processed parallel if implementation class wants
+   * @throws CarbonDataLoadingException
+   */
+  public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+    Iterator<CarbonRowBatch>[] childIters = child.execute();
+    Iterator<CarbonRowBatch>[] iterators = new Iterator[childIters.length];
+    for (int i = 0; i < childIters.length; i++) {
+      iterators[i] = getIterator(childIters[i]);
+    }
+    return iterators;
+  }
+
+  /**
+   * Create the iterator using child iterator.
+   *
+   * @param childIter
+   * @return new iterator with step specific processing.
+   */
+  protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
+    return new CarbonIterator<CarbonRowBatch>() {
+      @Override public boolean hasNext() {
+        return childIter.hasNext();
+      }
+
+      @Override public CarbonRowBatch next() {
+        return processRowBatch(childIter.next());
+      }
+    };
+  }
+
+  /**
+   * Process the batch of rows as per the step logic.
+   *
+   * @param rowBatch
+   * @return processed row.
+   */
+  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) {
+    CarbonRowBatch newBatch = new CarbonRowBatch(rowBatch.getSize());
+    while (rowBatch.hasNext()) {
+      newBatch.addRow(processRow(rowBatch.next()));
+    }
+    return newBatch;
+  }
+
+  /**
+   * Process the row as per the step logic.
+   *
+   * @param row
+   * @return processed row.
+   */
+  protected abstract CarbonRow processRow(CarbonRow row);
+
+  /**
+   * Get the step name for logging purpose.
+   * @return Step name
+   */
+  protected abstract String getStepName();
+
+
+  /**
+   * Close all resources.This method is called after execute() is finished.
+   * It will be called in both success and failure cases.
+   */
+  public void close() {
+    if (!closed) {
+      closed = true;
+      LOGGER.info("Total rows processed in step " + this.getStepName() + ": " + rowCounter.get());
+      if (child != null) {
+        child.close();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
new file mode 100644
index 0000000..bc0ce3a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/BadRecordsLogger.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+
+public class BadRecordsLogger {
+
+  /**
+   * Comment for <code>LOGGER</code>
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(BadRecordsLogger.class.getName());
+  /**
+   * Which holds the key and if any bad rec found to check from API to update
+   * the status
+   */
+  private static Map<String, String> badRecordEntry =
+      new HashMap<String, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  /**
+   * File Name
+   */
+  private String fileName;
+  /**
+   * Store path
+   */
+  private String storePath;
+  /**
+   * FileChannel
+   */
+  private BufferedWriter bufferedWriter;
+  private DataOutputStream outStream;
+  /**
+   * csv file writer
+   */
+  private BufferedWriter bufferedCSVWriter;
+  private DataOutputStream outCSVStream;
+  /**
+   * bad record log file path
+   */
+  private String logFilePath;
+  /**
+   * csv file path
+   */
+  private String csvFilePath;
+
+  /**
+   * task key which is DatabaseName/TableName/tablename
+   */
+  private String taskKey;
+
+  private boolean badRecordsLogRedirect;
+
+  private boolean badRecordLoggerEnable;
+
+  private boolean badRecordConvertNullDisable;
+
+  private boolean isDataLoadFail;
+
+  // private final Object syncObject =new Object();
+
+  public BadRecordsLogger(String key, String fileName, String storePath,
+      boolean badRecordsLogRedirect, boolean badRecordLoggerEnable,
+      boolean badRecordConvertNullDisable, boolean isDataLoadFail) {
+    // Initially no bad rec
+    taskKey = key;
+    this.fileName = fileName;
+    this.storePath = storePath;
+    this.badRecordsLogRedirect = badRecordsLogRedirect;
+    this.badRecordLoggerEnable = badRecordLoggerEnable;
+    this.badRecordConvertNullDisable = badRecordConvertNullDisable;
+    this.isDataLoadFail = isDataLoadFail;
+  }
+
+  /**
+   * @param key DatabaseNaame/TableName/tablename
+   * @return return "Partially"
+   */
+  public static String hasBadRecord(String key) {
+    return badRecordEntry.get(key);
+  }
+
+  /**
+   * @param key DatabaseNaame/TableName/tablename
+   * @return remove key from the map
+   */
+  public static String removeBadRecordKey(String key) {
+    return badRecordEntry.remove(key);
+  }
+
+  public void addBadRecordsToBuilder(Object[] row, String reason)
+      throws CarbonDataLoadingException {
+    if (badRecordsLogRedirect || badRecordLoggerEnable) {
+      StringBuilder logStrings = new StringBuilder();
+      int size = row.length;
+      int count = size;
+      for (int i = 0; i < size; i++) {
+        if (null == row[i]) {
+          char ch =
+              logStrings.length() > 0 ? logStrings.charAt(logStrings.length() - 1) : (char) -1;
+          if (ch == ',') {
+            logStrings = logStrings.deleteCharAt(logStrings.lastIndexOf(","));
+          }
+          break;
+        } else if (CarbonCommonConstants.MEMBER_DEFAULT_VAL.equals(row[i].toString())) {
+          logStrings.append("null");
+        } else {
+          logStrings.append(row[i]);
+        }
+        if (count > 1) {
+          logStrings.append(',');
+        }
+        count--;
+      }
+      if (badRecordsLogRedirect) {
+        writeBadRecordsToCSVFile(logStrings);
+      }
+      if (badRecordLoggerEnable) {
+        logStrings.append("----->");
+        if (null != reason) {
+          if (reason.indexOf(CarbonCommonConstants.MEMBER_DEFAULT_VAL) > -1) {
+            logStrings
+                .append(reason.replace(CarbonCommonConstants.MEMBER_DEFAULT_VAL, "null"));
+          } else {
+            logStrings.append(reason);
+          }
+        }
+        writeBadRecordsToFile(logStrings);
+      }
+    } else {
+      // setting partial success entry since even if bad records are there then load
+      // status should be partial success regardless of bad record logged
+      badRecordEntry.put(taskKey, "Partially");
+    }
+  }
+
+  /**
+   *
+   */
+  private synchronized void writeBadRecordsToFile(StringBuilder logStrings)
+      throws CarbonDataLoadingException {
+    if (null == logFilePath) {
+      logFilePath =
+          this.storePath + File.separator + this.fileName + CarbonCommonConstants.LOG_FILE_EXTENSION
+              + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+    }
+    try {
+      if (null == bufferedWriter) {
+        FileType fileType = FileFactory.getFileType(storePath);
+        if (!FileFactory.isFileExist(this.storePath, fileType)) {
+          // create the folders if not exist
+          FileFactory.mkdirs(this.storePath, fileType);
+
+          // create the files
+          FileFactory.createNewFile(logFilePath, fileType);
+        }
+
+        outStream = FileFactory.getDataOutputStream(logFilePath, fileType);
+
+        bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream,
+            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      }
+      bufferedWriter.write(logStrings.toString());
+      bufferedWriter.newLine();
+    } catch (FileNotFoundException e) {
+      LOGGER.error("Bad Log Files not found");
+      throw new CarbonDataLoadingException("Bad Log Files not found", e);
+    } catch (IOException e) {
+      LOGGER.error("Error While writing bad record log File");
+      throw new CarbonDataLoadingException("Error While writing bad record log File", e);
+    } finally {
+      // if the Bad record file is created means it partially success
+      // if any entry present with key that means its have bad record for
+      // that key
+      badRecordEntry.put(taskKey, "Partially");
+    }
+  }
+
+  /**
+   * method will write the row having bad record in the csv file.
+   *
+   * @param logStrings
+   */
+  private synchronized void writeBadRecordsToCSVFile(StringBuilder logStrings)
+      throws CarbonDataLoadingException {
+    if (null == csvFilePath) {
+      csvFilePath =
+          this.storePath + File.separator + this.fileName + CarbonCommonConstants.CSV_FILE_EXTENSION
+              + CarbonCommonConstants.FILE_INPROGRESS_STATUS;
+    }
+    try {
+      if (null == bufferedCSVWriter) {
+        FileType fileType = FileFactory.getFileType(storePath);
+        if (!FileFactory.isFileExist(this.storePath, fileType)) {
+          // create the folders if not exist
+          FileFactory.mkdirs(this.storePath, fileType);
+
+          // create the files
+          FileFactory.createNewFile(csvFilePath, fileType);
+        }
+
+        outCSVStream = FileFactory.getDataOutputStream(csvFilePath, fileType);
+
+        bufferedCSVWriter = new BufferedWriter(new OutputStreamWriter(outCSVStream,
+            Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      }
+      bufferedCSVWriter.write(logStrings.toString());
+      bufferedCSVWriter.newLine();
+    } catch (FileNotFoundException e) {
+      LOGGER.error("Bad record csv Files not found");
+      throw new CarbonDataLoadingException("Bad record csv Files not found", e);
+    } catch (IOException e) {
+      LOGGER.error("Error While writing bad record csv File");
+      throw new CarbonDataLoadingException("Error While writing bad record csv File", e);
+    }
+    finally {
+      badRecordEntry.put(taskKey, "Partially");
+    }
+  }
+
+  public boolean isBadRecordConvertNullDisable() {
+    return badRecordConvertNullDisable;
+  }
+
+  public boolean isDataLoadFail() {
+    return isDataLoadFail;
+  }
+
+  public boolean isBadRecordLoggerEnable() {
+    return badRecordLoggerEnable;
+  }
+
+  public boolean isBadRecordsLogRedirect() {
+    return badRecordsLogRedirect;
+  }
+
+  /**
+   * closeStreams void
+   */
+  public synchronized void closeStreams() {
+    CarbonUtil.closeStreams(bufferedWriter, outStream, bufferedCSVWriter, outCSVStream);
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
new file mode 100644
index 0000000..7309c91
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.BucketingInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.processing.loading.converter.DictionaryCardinalityFinder;
+
+public class CarbonDataLoadConfiguration {
+
+  private DataField[] dataFields;
+
+  private AbsoluteTableIdentifier tableIdentifier;
+
+  private String[] header;
+
+  private String partitionId;
+
+  private String segmentId;
+
+  private String taskNo;
+
+  private BucketingInfo bucketingInfo;
+
+  private Map<String, Object> dataLoadProperties = new HashMap<>();
+
+  /**
+   *  Use one pass to generate dictionary
+   */
+  private boolean useOnePass;
+
+  /**
+   * dictionary server host
+   */
+  private String dictionaryServerHost;
+
+  /**
+   * dictionary sever port
+   */
+  private int dictionaryServerPort;
+
+  private boolean preFetch;
+
+  private int dimensionCount;
+
+  private int measureCount;
+
+  private int noDictionaryCount;
+
+  private int complexColumnCount;
+
+  /**
+   * schema updated time stamp to be used for restructure scenarios
+   */
+  private long schemaUpdatedTimeStamp;
+
+  private DictionaryCardinalityFinder cardinalityFinder;
+
+  private int numberOfSortColumns;
+
+  private int numberOfNoDictSortColumns;
+
+  // contains metadata used in write step of loading process
+  private TableSpec tableSpec;
+
+  public CarbonDataLoadConfiguration() {
+  }
+
+  public void setDataFields(DataField[] dataFields) {
+    this.dataFields = dataFields;
+
+    // set counts for each column category
+    for (DataField dataField : dataFields) {
+      CarbonColumn column = dataField.getColumn();
+      if (column.isDimension()) {
+        dimensionCount++;
+        if (!dataField.hasDictionaryEncoding()) {
+          noDictionaryCount++;
+        }
+      }
+      if (column.isComplex()) {
+        complexColumnCount++;
+      }
+      if (column.isMeasure()) {
+        measureCount++;
+      }
+    }
+  }
+
+  public DataField[] getDataFields() {
+    return dataFields;
+  }
+
+  public int getDimensionCount() {
+    return dimensionCount;
+  }
+
+  public int getNoDictionaryCount() {
+    return noDictionaryCount;
+  }
+
+  public int getComplexColumnCount() {
+    return complexColumnCount;
+  }
+
+  public int getMeasureCount() {
+    return measureCount;
+  }
+
+  public void setNumberOfSortColumns(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
+  }
+
+  public int getNumberOfSortColumns() {
+    return this.numberOfSortColumns;
+  }
+
+  public boolean isSortTable() {
+    return this.numberOfSortColumns > 0;
+  }
+
+  public void setNumberOfNoDictSortColumns(int numberOfNoDictSortColumns) {
+    this.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
+  }
+
+  public int getNumberOfNoDictSortColumns() {
+    return this.numberOfNoDictSortColumns;
+  }
+
+  public String[] getHeader() {
+    return header;
+  }
+
+  public void setHeader(String[] header) {
+    this.header = header;
+  }
+
+  public AbsoluteTableIdentifier getTableIdentifier() {
+    return tableIdentifier;
+  }
+
+  public void setTableIdentifier(AbsoluteTableIdentifier tableIdentifier) {
+    this.tableIdentifier = tableIdentifier;
+  }
+
+  public String getPartitionId() {
+    return partitionId;
+  }
+
+  public void setPartitionId(String partitionId) {
+    this.partitionId = partitionId;
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  public void setSegmentId(String segmentId) {
+    this.segmentId = segmentId;
+  }
+
+  public String getTaskNo() {
+    return taskNo;
+  }
+
+  public void setTaskNo(String taskNo) {
+    this.taskNo = taskNo;
+  }
+
+  public void setDataLoadProperty(String key, Object value) {
+    dataLoadProperties.put(key, value);
+  }
+
+  public Object getDataLoadProperty(String key) {
+    return dataLoadProperties.get(key);
+  }
+
+  public BucketingInfo getBucketingInfo() {
+    return bucketingInfo;
+  }
+
+  public void setBucketingInfo(BucketingInfo bucketingInfo) {
+    this.bucketingInfo = bucketingInfo;
+  }
+
+  public boolean getUseOnePass() {
+    return useOnePass;
+  }
+
+  public void setUseOnePass(boolean useOnePass) {
+    this.useOnePass = useOnePass;
+  }
+
+  public String getDictionaryServerHost() {
+    return dictionaryServerHost;
+  }
+
+  public void setDictionaryServerHost(String dictionaryServerHost) {
+    this.dictionaryServerHost = dictionaryServerHost;
+  }
+
+  public int getDictionaryServerPort() {
+    return dictionaryServerPort;
+  }
+
+  public void setDictionaryServerPort(int dictionaryServerPort) {
+    this.dictionaryServerPort = dictionaryServerPort;
+  }
+
+  public boolean isPreFetch() {
+    return preFetch;
+  }
+
+  public void setPreFetch(boolean preFetch) {
+    this.preFetch = preFetch;
+  }
+
+  public long getSchemaUpdatedTimeStamp() {
+    return schemaUpdatedTimeStamp;
+  }
+
+  public void setSchemaUpdatedTimeStamp(long schemaUpdatedTimeStamp) {
+    this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
+  }
+
+  public DictionaryCardinalityFinder getCardinalityFinder() {
+    return cardinalityFinder;
+  }
+
+  public void setCardinalityFinder(DictionaryCardinalityFinder cardinalityFinder) {
+    this.cardinalityFinder = cardinalityFinder;
+  }
+
+  public DataType[] getMeasureDataType() {
+    List<Integer> measureIndexes = new ArrayList<>(dataFields.length);
+    int measureCount = 0;
+    for (int i = 0; i < dataFields.length; i++) {
+      if (!dataFields[i].getColumn().isDimension()) {
+        measureIndexes.add(i);
+        measureCount++;
+      }
+    }
+
+    DataType[] type = new DataType[measureCount];
+    for (int i = 0; i < type.length; i++) {
+      type[i] = dataFields[measureIndexes.get(i)].getColumn().getDataType();
+    }
+    return type;
+  }
+
+  public int[] calcDimensionLengths() {
+    int[] dimLensWithComplex = getCardinalityFinder().getCardinality();
+    if (!isSortTable()) {
+      for (int i = 0; i < dimLensWithComplex.length; i++) {
+        if (dimLensWithComplex[i] != 0) {
+          dimLensWithComplex[i] = Integer.MAX_VALUE;
+        }
+      }
+    }
+    List<Integer> dimsLenList = new ArrayList<Integer>();
+    for (int eachDimLen : dimLensWithComplex) {
+      if (eachDimLen != 0) dimsLenList.add(eachDimLen);
+    }
+    int[] dimLens = new int[dimsLenList.size()];
+    for (int i = 0; i < dimsLenList.size(); i++) {
+      dimLens[i] = dimsLenList.get(i);
+    }
+    return dimLens;
+  }
+
+  public KeyGenerator[] createKeyGeneratorForComplexDimension() {
+    int[] dimLens = calcDimensionLengths();
+    KeyGenerator[] complexKeyGenerators = new KeyGenerator[dimLens.length];
+    for (int i = 0; i < dimLens.length; i++) {
+      complexKeyGenerators[i] =
+          KeyGeneratorFactory.getKeyGenerator(new int[] { dimLens[i] });
+    }
+    return complexKeyGenerators;
+  }
+
+  public TableSpec getTableSpec() {
+    return tableSpec;
+  }
+
+  public void setTableSpec(TableSpec tableSpec) {
+    this.tableSpec = tableSpec;
+  }
+}