You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/10 14:49:26 UTC

[4/5] incubator-carbondata git commit: Data load integration of all steps for removing kettle

Data load integration of all steps for removing kettle


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/496cde46
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/496cde46
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/496cde46

Branch: refs/heads/master
Commit: 496cde46ddd43c14a07172a913550f6fd47bd7c5
Parents: ae9d88b
Author: ravipesala <ra...@gmail.com>
Authored: Wed Nov 9 22:38:47 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Nov 9 22:38:47 2016 +0530

----------------------------------------------------------------------
 .../core/carbon/CarbonTableIdentifier.java      |   8 +
 .../store/dataholder/CarbonWriteDataHolder.java |  42 ++
 .../TimeStampDirectDictionaryGenerator.java     |   7 +-
 .../carbondata/core/load/BlockDetails.java      |  31 +-
 .../carbondata/hadoop/csv/CSVInputFormat.java   |  54 ++
 .../recorditerator/RecordReaderIterator.java    |  68 ++
 .../hadoop/test/util/StoreCreator.java          |   4 +-
 .../sql/common/util/CarbonHiveContext.scala     |   3 +-
 integration/spark/pom.xml                       |   1 +
 .../spark/merger/RowResultMerger.java           |   2 +-
 .../carbondata/spark/load/CarbonLoadModel.java  | 645 ------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java |   1 +
 .../spark/load/DeleteLoadFolders.java           |   1 +
 .../spark/merger/CarbonDataMergerUtil.java      |   2 +-
 .../carbondata/spark/util/LoadMetadataUtil.java |   2 +-
 .../spark/CarbonDataFrameWriter.scala           |   3 +-
 .../spark/rdd/CarbonDataLoadRDD.scala           |   5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  58 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |   3 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   3 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 272 ++++++++
 .../spark/util/GlobalDictionaryUtil.scala       |   2 +-
 .../apache/spark/mapred/SparkMapRedUtil.scala   |  32 +
 .../org/apache/spark/sql/CarbonSqlParser.scala  |   3 +-
 .../execution/command/carbonTableSchema.scala   |  59 +-
 .../spark/sql/hive/CarbonStrategies.scala       |   4 +-
 .../org/apache/spark/util/SplitUtils.scala      |   2 +-
 .../TestLoadDataWithNotProperInputFile.scala    |   2 +-
 .../spark/util/AllDictionaryTestCase.scala      |   4 +-
 .../AutoHighCardinalityIdentifyTestCase.scala   |   3 +-
 .../util/ExternalColumnDictionaryTestCase.scala |   3 +-
 ...GlobalDictionaryUtilConcurrentTestCase.scala |   3 +-
 .../util/GlobalDictionaryUtilTestCase.scala     |   4 +-
 .../sql/common/util/CarbonHiveContext.scala     |   3 +-
 pom.xml                                         |   7 +
 .../processing/csvload/DataGraphExecuter.java   |  45 +-
 .../processing/csvload/GraphExecutionUtil.java  |  61 --
 .../processing/datatypes/ArrayDataType.java     |  27 +-
 .../processing/datatypes/GenericDataType.java   |  18 +-
 .../processing/datatypes/PrimitiveDataType.java |  73 ++-
 .../processing/datatypes/StructDataType.java    |  37 +-
 .../processing/mdkeygen/MDKeyGenStep.java       |   2 +-
 .../processing/model/CarbonLoadModel.java       | 647 +++++++++++++++++++
 .../processing/newflow/DataField.java           |  17 +-
 .../processing/newflow/DataLoadExecutor.java    |  72 +++
 .../newflow/DataLoadProcessBuilder.java         | 167 +++++
 .../constants/DataLoadProcessorConstants.java   |   8 +
 .../newflow/converter/BadRecordLogHolder.java   |  46 ++
 .../newflow/converter/FieldConverter.java       |   4 +-
 .../newflow/converter/RowConverter.java         |   4 +
 .../AbstractDictionaryFieldConverterImpl.java   |   4 +-
 .../impl/ComplexFieldConverterImpl.java         |  36 +-
 .../impl/DictionaryFieldConverterImpl.java      |  31 +-
 .../DirectDictionaryFieldConverterImpl.java     |  51 +-
 .../converter/impl/FieldEncoderFactory.java     |  86 ++-
 .../impl/MeasureFieldConverterImpl.java         |  83 +++
 .../impl/NonDictionaryFieldConverterImpl.java   |  20 +-
 .../converter/impl/RowConverterImpl.java        |  50 +-
 .../newflow/dictionary/DirectDictionary.java    |  59 ++
 .../dictionary/PreCreatedDictionary.java        |   8 +-
 .../exception/BadRecordFoundException.java      |  67 ++
 .../newflow/parser/CarbonParserFactory.java     |  25 +-
 .../newflow/parser/GenericParser.java           |   4 +-
 .../newflow/parser/impl/ArrayParserImpl.java    |  33 +-
 .../parser/impl/PrimitiveParserImpl.java        |   2 +-
 .../newflow/parser/impl/RowParserImpl.java      |  68 +-
 .../newflow/parser/impl/StructParserImpl.java   |  27 +-
 .../processing/newflow/row/CarbonRow.java       |  18 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |  28 +-
 .../sort/impl/SortPreparatorIterator.java       | 147 -----
 .../steps/DataConverterProcessorStepImpl.java   | 106 ++-
 .../steps/DataWriterProcessorStepImpl.java      | 216 +++++++
 .../newflow/steps/DummyClassForTest.java        |  84 +++
 .../newflow/steps/InputProcessorStepImpl.java   |  47 +-
 .../newflow/steps/SortProcessorStepImpl.java    |   1 +
 .../writer/DataWriterProcessorStepImpl.java     | 222 -------
 .../sortdata/IntermediateFileMerger.java        |  90 ++-
 .../sortdata/NewRowComparator.java              |  73 +++
 .../sortdata/NewRowComparatorForNormalDims.java |  61 ++
 .../sortandgroupby/sortdata/SortDataRows.java   | 112 +++-
 .../sortandgroupby/sortdata/SortParameters.java |  44 +-
 .../sortdata/SortTempFileChunkHolder.java       | 119 +++-
 .../store/CarbonFactDataHandlerColumnar.java    | 309 ++++++++-
 .../store/CarbonFactDataHandlerModel.java       |  40 +-
 .../store/SingleThreadFinalSortFilesMerger.java |   8 +-
 .../csvbased/BadRecordsLogger.java              | 249 +++++++
 .../csvbased/BadRecordslogger.java              | 235 -------
 .../csvbased/CarbonCSVBasedSeqGenStep.java      |  57 +-
 .../util/CarbonDataProcessorUtil.java           | 226 ++++++-
 .../processing/util/RemoveDictionaryUtil.java   |   9 +
 .../carbondata/test/util/StoreCreator.java      |   3 +-
 91 files changed, 4027 insertions(+), 1705 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/core/src/main/java/org/apache/carbondata/core/carbon/CarbonTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/CarbonTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/CarbonTableIdentifier.java
index bb8a816..439f7b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/CarbonTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/CarbonTableIdentifier.java
@@ -19,6 +19,7 @@
 
 package org.apache.carbondata.core.carbon;
 
+import java.io.File;
 import java.io.Serializable;
 
 /**
@@ -78,6 +79,13 @@ public class CarbonTableIdentifier implements Serializable {
     return databaseName + '_' + tableName;
   }
 
+  /**
+   *Creates the key for bad record lgger.
+   */
+  public String getBadRecordLoggerKey() {
+    return databaseName + File.separator + tableName + '_' + tableName;
+  }
+
   @Override public int hashCode() {
     final int prime = 31;
     int result = 1;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
index 951857e..08f7786 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/dataholder/CarbonWriteDataHolder.java
@@ -36,6 +36,11 @@ public class CarbonWriteDataHolder {
   private byte[][] byteValues;
 
   /**
+   * byteValues for no dictionary and non kettle flow.
+   */
+  private byte[][][] byteValuesForNonDictionary;
+
+  /**
    * byteValues
    */
   private byte[][][] columnByteValues;
@@ -69,6 +74,7 @@ public class CarbonWriteDataHolder {
 
   /**
    * Method to initialise double array
+   * TODO Remove after kettle flow got removed.
    *
    * @param size
    */
@@ -82,6 +88,27 @@ public class CarbonWriteDataHolder {
   }
 
   /**
+   * Method to initialise byte array
+   *
+   * @param size
+   */
+  public void initialiseByteArrayValuesWithOutKettle(int size) {
+    if (size < 1) {
+      throw new IllegalArgumentException("Invalid array size");
+    }
+
+    byteValues = new byte[size][];
+  }
+
+  public void initialiseByteArrayValuesForNonDictionary(int size) {
+    if (size < 1) {
+      throw new IllegalArgumentException("Invalid array size");
+    }
+
+    byteValuesForNonDictionary = new byte[size][][];
+  }
+
+  /**
    * Method to initialise long array
    *
    * @param size
@@ -127,6 +154,12 @@ public class CarbonWriteDataHolder {
     if (null != value) totalSize += value.length;
   }
 
+  public void setWritableNonDictByteArrayValueByIndex(int index, byte[][] value) {
+    byteValuesForNonDictionary[index] = value;
+    size++;
+    if (null != value) totalSize += value.length;
+  }
+
   /**
    * set byte array value by index
    */
@@ -172,6 +205,15 @@ public class CarbonWriteDataHolder {
     return byteValues;
   }
 
+  public byte[][][] getNonDictByteArrayValues() {
+    if (size < byteValuesForNonDictionary.length) {
+      byte[][][] temp = new byte[size][][];
+      System.arraycopy(byteValuesForNonDictionary, 0, temp, 0, size);
+      byteValuesForNonDictionary = temp;
+    }
+    return byteValuesForNonDictionary;
+  }
+
   /**
    * Get Writable Double Values
    *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
index a9ceab4..54d2965 100644
--- a/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/keygenerator/directdictionary/timestamp/TimeStampDirectDictionaryGenerator.java
@@ -154,7 +154,12 @@ public class TimeStampDirectDictionaryGenerator implements DirectDictionaryGener
   private int getDirectSurrogateForMember(String memberStr) {
     Date dateToStr = null;
     try {
-      dateToStr = simpleDateFormatLocal.get().parse(memberStr);
+      SimpleDateFormat simpleDateFormat = simpleDateFormatLocal.get();
+      if (null == simpleDateFormat) {
+        initialize();
+        simpleDateFormat = simpleDateFormatLocal.get();
+      }
+      dateToStr = simpleDateFormat.parse(memberStr);
     } catch (ParseException e) {
       LOGGER.debug(
           "Cannot convert " + memberStr + " to Time/Long type value. Value considered as null." + e

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java b/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
index c3fd997..69c7cf5 100644
--- a/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/load/BlockDetails.java
@@ -23,10 +23,14 @@ import java.io.Serializable;
 
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
 /**
  * blocks info
+ * TODO Remove this class after removing of kettle.
  */
-public class BlockDetails implements Serializable {
+public class BlockDetails extends FileSplit implements Serializable {
 
   /**
    * serialization version
@@ -41,8 +45,9 @@ public class BlockDetails implements Serializable {
   // locations where this block exists
   private String[] locations;
 
-  public BlockDetails(String filePath, long blockOffset, long blockLength, String[] locations) {
-    this.filePath = filePath;
+  public BlockDetails(Path filePath, long blockOffset, long blockLength, String[] locations) {
+    super(filePath, blockOffset, blockLength, locations);
+    this.filePath = filePath.toString();
     this.blockOffset = blockOffset;
     this.blockLength = blockLength;
     this.locations = locations;
@@ -52,18 +57,10 @@ public class BlockDetails implements Serializable {
     return blockOffset;
   }
 
-  public void setBlockOffset(long blockOffset) {
-    this.blockOffset = blockOffset;
-  }
-
   public long getBlockLength() {
     return blockLength;
   }
 
-  public void setBlockLength(long blockLength) {
-    this.blockLength = blockLength;
-  }
-
   public String getFilePath() {
     return FileFactory.getUpdatedFilePath(filePath);
   }
@@ -75,4 +72,16 @@ public class BlockDetails implements Serializable {
   public String[] getLocations() {
     return locations;
   }
+
+  /** The file containing this split's data. */
+  @Override
+  public Path getPath() { return new Path(filePath); }
+
+  /** The position of the first byte in the file to process. */
+  @Override
+  public long getStart() { return blockOffset; }
+
+  /** The number of bytes in the file to process. */
+  @Override
+  public long getLength() { return blockLength; }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
index 33484d7..3ea96ac 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
@@ -84,6 +84,60 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
   }
 
   /**
+   * Sets the comment char to configuration. Default it is #.
+   * @param commentChar
+   * @param configuration
+   */
+  public static void setCommentCharacter(String commentChar, Configuration configuration) {
+    if (commentChar != null && !commentChar.isEmpty()) {
+      configuration.set(COMMENT, commentChar);
+    }
+  }
+
+  /**
+   * Sets the delimiter to configuration. Default it is ','
+   * @param delimiter
+   * @param configuration
+   */
+  public static void setCSVDelimiter(String delimiter, Configuration configuration) {
+    if (delimiter != null && !delimiter.isEmpty()) {
+      configuration.set(DELIMITER, delimiter);
+    }
+  }
+
+  /**
+   * Sets the escape character to configuration. Default it is \
+   * @param escapeCharacter
+   * @param configuration
+   */
+  public static void setEscapeCharacter(String escapeCharacter, Configuration configuration) {
+    if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
+      configuration.set(ESCAPE, escapeCharacter);
+    }
+  }
+
+  /**
+   * Whether header needs to read from csv or not. By default it is false.
+   * @param headerExtractEnable
+   * @param configuration
+   */
+  public static void setHeaderExtractionEnabled(boolean headerExtractEnable,
+      Configuration configuration) {
+    configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
+  }
+
+  /**
+   * Sets the quote character to configuration. Default it is "
+   * @param quoteCharacter
+   * @param configuration
+   */
+  public static void setQuoteCharacter(String quoteCharacter, Configuration configuration) {
+    if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
+      configuration.set(QUOTE, quoteCharacter);
+    }
+  }
+
+  /**
    * Treats value as line in file. Key is null.
    */
   public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
new file mode 100644
index 0000000..478af0a
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.csv.recorditerator;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.hadoop.io.StringArrayWritable;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+
+/**
+ * It is wrapper iterator around @{@link RecordReader}.
+ */
+public class RecordReaderIterator extends CarbonIterator<Object []> {
+
+  private RecordReader<NullWritable, StringArrayWritable> recordReader;
+
+  /**
+   * It is just a little hack to make recordreader as iterator. Usually we cannot call hasNext
+   * multiple times on record reader as it moves another line. To avoid that situation like hasNext
+   * only tells whether next row is present or not and next will move the pointer to next row after
+   * consuming it.
+   */
+  private boolean isConsumed;
+
+  public RecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader) {
+    this.recordReader = recordReader;
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      if (!isConsumed) {
+        isConsumed = recordReader.nextKeyValue();
+        return isConsumed;
+      }
+      return isConsumed;
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  @Override
+  public Object[] next() {
+    try {
+      String[] data = recordReader.getCurrentValue().get();
+      isConsumed = false;
+      return data;
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index 0280820..929e404 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -19,6 +19,8 @@
 package org.apache.carbondata.hadoop.test.util;
 
 import com.google.gson.Gson;
+import org.apache.hadoop.fs.Path;
+
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
@@ -350,7 +352,7 @@ public class StoreCreator {
     DataProcessTaskStatus dataProcessTaskStatus = new DataProcessTaskStatus(databaseName, tableName);
     dataProcessTaskStatus.setCsvFilePath(loadModel.getFactFilePath());
     SchemaInfo info = new SchemaInfo();
-    BlockDetails blockDetails = new BlockDetails(loadModel.getFactFilePath(),
+    BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
         0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
     GraphGenerator.blockInfo.put("qwqwq", new BlockDetails[] { blockDetails });
     dataProcessTaskStatus.setBlocksID("qwqwq");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
----------------------------------------------------------------------
diff --git a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
index 0c3ce9f..5a5d0dd 100644
--- a/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
+++ b/integration-testcases/src/test/scala/org/apache/spark/sql/common/util/CarbonHiveContext.scala
@@ -31,7 +31,8 @@ class LocalSQLContext(val hdfsCarbonBasePath: String)
   extends CarbonContext(new SparkContext(new SparkConf()
     .setAppName("CarbonSpark")
     .setMaster("local[2]")
-    .set("spark.sql.shuffle.partitions", "20")),
+    .set("spark.sql.shuffle.partitions", "20")
+    .set("use_kettle_default", "true")),
     hdfsCarbonBasePath,
     hdfsCarbonBasePath) {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index 883a169..eff18f1 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -182,6 +182,7 @@
           </environmentVariables>
           <systemProperties>
             <java.awt.headless>true</java.awt.headless>
+            <use.kettle>${use.kettle}</use.kettle>
           </systemProperties>
         </configuration>
         <executions>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
index 7c3d2a7..1028f78 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/integration/spark/merger/RowResultMerger.java
@@ -46,6 +46,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.merger.exeception.SliceMergerException;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
@@ -53,7 +54,6 @@ import org.apache.carbondata.processing.store.CarbonFactHandler;
 import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.apache.carbondata.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
-import org.apache.carbondata.spark.load.CarbonLoadModel;
 
 /**
  * This is the Merger class responsible for the merging of the segments.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
deleted file mode 100644
index b33601d..0000000
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoadModel.java
+++ /dev/null
@@ -1,645 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- *
- */
-package org.apache.carbondata.spark.load;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.load.LoadMetadataDetails;
-
-public class CarbonLoadModel implements Serializable {
-  /**
-   *
-   */
-  private static final long serialVersionUID = 6580168429197697465L;
-
-  private String databaseName;
-
-  private String tableName;
-
-  private String factFilePath;
-
-  private String dimFolderPath;
-
-  private String colDictFilePath;
-
-  private String partitionId;
-
-  private CarbonDataLoadSchema carbonDataLoadSchema;
-
-  private String[] aggTables;
-
-  private String aggTableName;
-
-  private boolean aggLoadRequest;
-
-  private String storePath;
-
-  private boolean isRetentionRequest;
-
-  private List<String> factFilesToProcess;
-  private String csvHeader;
-  private String csvDelimiter;
-  private String complexDelimiterLevel1;
-  private String complexDelimiterLevel2;
-
-  private boolean isDirectLoad;
-  private List<LoadMetadataDetails> loadMetadataDetails;
-
-  private String blocksID;
-
-  /**
-   *  Map from carbon dimension to pre defined dict file path
-   */
-  private HashMap<CarbonDimension, String> predefDictMap;
-
-  /**
-   * task id, each spark task has a unique id
-   */
-  private String taskNo;
-  /**
-   * new load start time
-   */
-  private String factTimeStamp;
-  /**
-   * load Id
-   */
-  private String segmentId;
-
-  private String allDictPath;
-
-  /**
-   * escape Char
-   */
-  private String escapeChar;
-
-  /**
-   * quote Char
-   */
-  private String quoteChar;
-
-  /**
-   * comment Char
-   */
-  private String commentChar;
-
-  private String dateFormat;
-
-  /**
-   * defines the string that should be treated as null while loadind data
-   */
-  private String serializationNullFormat;
-
-  /**
-   * defines the string to specify whether the bad record logger should be enabled or not
-   */
-  private String badRecordsLoggerEnable;
-
-  /**
-   * defines the option to specify the bad record logger action
-   */
-  private String badRecordsAction;
-
-  /**
-   * Max number of columns that needs to be parsed by univocity parser
-   */
-  private String maxColumns;
-
-  /**
-   * the key of RDD Iterator in RDD iterator Map
-   */
-  private String rddIteratorKey;
-
-  /**
-   * get escape char
-   * @return
-   */
-  public String getEscapeChar() {
-    return escapeChar;
-  }
-
-  /**
-   * set escape char
-   * @param escapeChar
-   */
-  public void setEscapeChar(String escapeChar) {
-    this.escapeChar = escapeChar;
-  }
-
-  /**
-   * get blocck id
-   *
-   * @return
-   */
-  public String getBlocksID() {
-    return blocksID;
-  }
-
-  /**
-   * set block id for carbon load model
-   *
-   * @param blocksID
-   */
-  public void setBlocksID(String blocksID) {
-    this.blocksID = blocksID;
-  }
-
-  public String getCsvDelimiter() {
-    return csvDelimiter;
-  }
-
-  public void setCsvDelimiter(String csvDelimiter) {
-    this.csvDelimiter = csvDelimiter;
-  }
-
-  public String getComplexDelimiterLevel1() {
-    return complexDelimiterLevel1;
-  }
-
-  public void setComplexDelimiterLevel1(String complexDelimiterLevel1) {
-    this.complexDelimiterLevel1 = complexDelimiterLevel1;
-  }
-
-  public String getComplexDelimiterLevel2() {
-    return complexDelimiterLevel2;
-  }
-
-  public void setComplexDelimiterLevel2(String complexDelimiterLevel2) {
-    this.complexDelimiterLevel2 = complexDelimiterLevel2;
-  }
-
-  public boolean isDirectLoad() {
-    return isDirectLoad;
-  }
-
-  public void setDirectLoad(boolean isDirectLoad) {
-    this.isDirectLoad = isDirectLoad;
-  }
-
-  public String getAllDictPath() {
-    return allDictPath;
-  }
-
-  public void setAllDictPath(String allDictPath) {
-    this.allDictPath = allDictPath;
-  }
-
-  public List<String> getFactFilesToProcess() {
-    return factFilesToProcess;
-  }
-
-  public void setFactFilesToProcess(List<String> factFilesToProcess) {
-    this.factFilesToProcess = factFilesToProcess;
-  }
-
-  public String getCsvHeader() {
-    return csvHeader;
-  }
-
-  public void setCsvHeader(String csvHeader) {
-    this.csvHeader = csvHeader;
-  }
-
-  public void initPredefDictMap() {
-    predefDictMap = new HashMap<>();
-  }
-
-  public String getPredefDictFilePath(CarbonDimension dimension) {
-    return predefDictMap.get(dimension);
-  }
-
-  public void setPredefDictMap(CarbonDimension dimension, String predefDictFilePath) {
-    this.predefDictMap.put(dimension, predefDictFilePath);
-  }
-
-  /**
-   * @return carbon dataload schema
-   */
-  public CarbonDataLoadSchema getCarbonDataLoadSchema() {
-    return carbonDataLoadSchema;
-  }
-
-  /**
-   * @param carbonDataLoadSchema
-   */
-  public void setCarbonDataLoadSchema(CarbonDataLoadSchema carbonDataLoadSchema) {
-    this.carbonDataLoadSchema = carbonDataLoadSchema;
-  }
-
-  /**
-   * @return the databaseName
-   */
-  public String getDatabaseName() {
-    return databaseName;
-  }
-
-  /**
-   * @param databaseName the databaseName to set
-   */
-  public void setDatabaseName(String databaseName) {
-    this.databaseName = databaseName;
-  }
-
-  /**
-   * @return the tableName
-   */
-  public String getTableName() {
-    return tableName;
-  }
-
-  /**
-   * @param tableName the tableName to set
-   */
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  /**
-   * @return the factFilePath
-   */
-  public String getFactFilePath() {
-    return factFilePath;
-  }
-
-  /**
-   * @param factFilePath the factFilePath to set
-   */
-  public void setFactFilePath(String factFilePath) {
-    this.factFilePath = factFilePath;
-  }
-
-  /**
-   *
-   * @return external column dictionary file path
-   */
-  public String getColDictFilePath() {
-    return colDictFilePath;
-  }
-
-  /**
-   * set external column dictionary file path
-   * @param colDictFilePath
-   */
-  public void setColDictFilePath(String colDictFilePath) {
-    this.colDictFilePath = colDictFilePath;
-  }
-
-  /**
-   * @return the dimFolderPath
-   */
-  public String getDimFolderPath() {
-    return dimFolderPath;
-  }
-
-  //TODO SIMIAN
-
-  /**
-   * @param dimFolderPath the dimFolderPath to set
-   */
-  public void setDimFolderPath(String dimFolderPath) {
-    this.dimFolderPath = dimFolderPath;
-  }
-
-  /**
-   * get copy with parition
-   *
-   * @param uniqueId
-   * @return
-   */
-  public CarbonLoadModel getCopyWithPartition(String uniqueId) {
-    CarbonLoadModel copy = new CarbonLoadModel();
-    copy.tableName = tableName;
-    copy.dimFolderPath = dimFolderPath;
-    copy.factFilePath = factFilePath + '/' + uniqueId;
-    copy.databaseName = databaseName;
-    copy.partitionId = uniqueId;
-    copy.aggTables = aggTables;
-    copy.aggTableName = aggTableName;
-    copy.aggLoadRequest = aggLoadRequest;
-    copy.loadMetadataDetails = loadMetadataDetails;
-    copy.isRetentionRequest = isRetentionRequest;
-    copy.complexDelimiterLevel1 = complexDelimiterLevel1;
-    copy.complexDelimiterLevel2 = complexDelimiterLevel2;
-    copy.carbonDataLoadSchema = carbonDataLoadSchema;
-    copy.blocksID = blocksID;
-    copy.taskNo = taskNo;
-    copy.factTimeStamp = factTimeStamp;
-    copy.segmentId = segmentId;
-    copy.serializationNullFormat = serializationNullFormat;
-    copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
-    copy.badRecordsAction = badRecordsAction;
-    copy.escapeChar = escapeChar;
-    copy.quoteChar = quoteChar;
-    copy.commentChar = commentChar;
-    copy.maxColumns = maxColumns;
-    return copy;
-  }
-
-  /**
-   * get CarbonLoadModel with partition
-   *
-   * @param uniqueId
-   * @param filesForPartition
-   * @param header
-   * @param delimiter
-   * @return
-   */
-  public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
-      String header, String delimiter) {
-    CarbonLoadModel copyObj = new CarbonLoadModel();
-    copyObj.tableName = tableName;
-    copyObj.dimFolderPath = dimFolderPath;
-    copyObj.factFilePath = null;
-    copyObj.databaseName = databaseName;
-    copyObj.partitionId = uniqueId;
-    copyObj.aggTables = aggTables;
-    copyObj.aggTableName = aggTableName;
-    copyObj.aggLoadRequest = aggLoadRequest;
-    copyObj.loadMetadataDetails = loadMetadataDetails;
-    copyObj.isRetentionRequest = isRetentionRequest;
-    copyObj.carbonDataLoadSchema = carbonDataLoadSchema;
-    copyObj.csvHeader = header;
-    copyObj.factFilesToProcess = filesForPartition;
-    copyObj.isDirectLoad = true;
-    copyObj.csvDelimiter = delimiter;
-    copyObj.complexDelimiterLevel1 = complexDelimiterLevel1;
-    copyObj.complexDelimiterLevel2 = complexDelimiterLevel2;
-    copyObj.blocksID = blocksID;
-    copyObj.taskNo = taskNo;
-    copyObj.factTimeStamp = factTimeStamp;
-    copyObj.segmentId = segmentId;
-    copyObj.serializationNullFormat = serializationNullFormat;
-    copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
-    copyObj.badRecordsAction = badRecordsAction;
-    copyObj.escapeChar = escapeChar;
-    copyObj.quoteChar = quoteChar;
-    copyObj.commentChar = commentChar;
-    copyObj.dateFormat = dateFormat;
-    copyObj.maxColumns = maxColumns;
-    return copyObj;
-  }
-
-  /**
-   * @return the partitionId
-   */
-  public String getPartitionId() {
-    return partitionId;
-  }
-
-  /**
-   * @param partitionId the partitionId to set
-   */
-  public void setPartitionId(String partitionId) {
-    this.partitionId = partitionId;
-  }
-
-  /**
-   * @return the aggTables
-   */
-  public String[] getAggTables() {
-    return aggTables;
-  }
-
-  /**
-   * @param aggTables the aggTables to set
-   */
-  public void setAggTables(String[] aggTables) {
-    this.aggTables = aggTables;
-  }
-
-  /**
-   * @return the aggLoadRequest
-   */
-  public boolean isAggLoadRequest() {
-    return aggLoadRequest;
-  }
-
-  /**
-   * @param aggLoadRequest the aggLoadRequest to set
-   */
-  public void setAggLoadRequest(boolean aggLoadRequest) {
-    this.aggLoadRequest = aggLoadRequest;
-  }
-
-  /**
-   * @param storePath The storePath to set.
-   */
-  public void setStorePath(String storePath) {
-    this.storePath = storePath;
-  }
-
-  /**
-   * @return Returns the aggTableName.
-   */
-  public String getAggTableName() {
-    return aggTableName;
-  }
-
-  /**
-   * @return Returns the factStoreLocation.
-   */
-  public String getStorePath() {
-    return storePath;
-  }
-
-  /**
-   * @param aggTableName The aggTableName to set.
-   */
-  public void setAggTableName(String aggTableName) {
-    this.aggTableName = aggTableName;
-  }
-
-  /**
-   * isRetentionRequest
-   *
-   * @return
-   */
-  public boolean isRetentionRequest() {
-    return isRetentionRequest;
-  }
-
-  /**
-   * @param isRetentionRequest
-   */
-  public void setRetentionRequest(boolean isRetentionRequest) {
-    this.isRetentionRequest = isRetentionRequest;
-  }
-
-  /**
-   * getLoadMetadataDetails.
-   *
-   * @return
-   */
-  public List<LoadMetadataDetails> getLoadMetadataDetails() {
-    return loadMetadataDetails;
-  }
-
-  /**
-   * setLoadMetadataDetails.
-   *
-   * @param loadMetadataDetails
-   */
-  public void setLoadMetadataDetails(List<LoadMetadataDetails> loadMetadataDetails) {
-    this.loadMetadataDetails = loadMetadataDetails;
-  }
-
-  /**
-   * @return
-   */
-  public String getTaskNo() {
-    return taskNo;
-  }
-
-  /**
-   * @param taskNo
-   */
-  public void setTaskNo(String taskNo) {
-    this.taskNo = taskNo;
-  }
-
-  /**
-   * @return
-   */
-  public String getFactTimeStamp() {
-    return factTimeStamp;
-  }
-
-  /**
-   * @param factTimeStamp
-   */
-  public void setFactTimeStamp(String factTimeStamp) {
-    this.factTimeStamp = factTimeStamp;
-  }
-
-  public String[] getDelimiters() {
-    return new String[] { complexDelimiterLevel1, complexDelimiterLevel2 };
-  }
-
-  /**
-   * @return load Id
-   */
-  public String getSegmentId() {
-    return segmentId;
-  }
-
-  /**
-   * @param segmentId
-   */
-  public void setSegmentId(String segmentId) {
-    this.segmentId = segmentId;
-  }
-
-  /**
-   * the method returns the value to be treated as null while data load
-   * @return
-   */
-  public String getSerializationNullFormat() {
-    return serializationNullFormat;
-  }
-
-  /**
-   * the method sets the value to be treated as null while data load
-   * @param serializationNullFormat
-   */
-  public void setSerializationNullFormat(String serializationNullFormat) {
-    this.serializationNullFormat = serializationNullFormat;
-  }
-
-  /**
-   * returns the string to enable bad record logger
-   * @return
-   */
-  public String getBadRecordsLoggerEnable() {
-    return badRecordsLoggerEnable;
-  }
-
-  /**
-   * method sets the string to specify whether to enable or dissable the badrecord logger.
-   * @param badRecordsLoggerEnable
-   */
-  public void setBadRecordsLoggerEnable(String badRecordsLoggerEnable) {
-    this.badRecordsLoggerEnable = badRecordsLoggerEnable;
-  }
-
-  public String getQuoteChar() {
-    return quoteChar;
-  }
-
-  public void setQuoteChar(String quoteChar) {
-    this.quoteChar = quoteChar;
-  }
-
-  public String getCommentChar() {
-    return commentChar;
-  }
-
-  public void setCommentChar(String commentChar) {
-    this.commentChar = commentChar;
-  }
-
-  public String getDateFormat() { return dateFormat; }
-
-  public void setDateFormat(String dateFormat) { this.dateFormat = dateFormat; }
-
-  /**
-   * @return
-   */
-  public String getMaxColumns() {
-    return maxColumns;
-  }
-
-  /**
-   * @param maxColumns
-   */
-  public void setMaxColumns(String maxColumns) {
-    this.maxColumns = maxColumns;
-  }
-
-  /**
-   *  returns option to specify the bad record logger action
-   * @return
-   */
-  public String getBadRecordsAction() {
-    return badRecordsAction;
-  }
-
-  /**
-   * set option to specify the bad record logger action
-   * @param badRecordsAction
-   */
-  public void setBadRecordsAction(String badRecordsAction) {
-    this.badRecordsAction = badRecordsAction;
-  }
-
-  public String getRddIteratorKey() {
-    return rddIteratorKey;
-  }
-
-  public void setRddIteratorKey(String rddIteratorKey) {
-    this.rddIteratorKey = rddIteratorKey;
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index fa796ab..3d670a2 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -79,6 +79,7 @@ import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus;
 import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
 import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.spark.merger.NodeBlockRelation;
 import org.apache.carbondata.spark.merger.NodeMultiBlockRelation;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
index b9ac4dd..d1ff644 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -45,6 +45,7 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
 
 public final class DeleteLoadFolders {
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
index 32a91de..6c9ca41 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.integration.spark.merger.CompactionType;
 import org.apache.carbondata.lcm.locks.ICarbonLock;
 import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.spark.load.CarbonLoadModel;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
 import org.apache.carbondata.spark.load.CarbonLoaderUtil;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
index a897e80..6b56545 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
 import org.apache.carbondata.lcm.status.SegmentStatusManager;
-import org.apache.carbondata.spark.load.CarbonLoadModel;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
 
 public final class LoadMetadataUtil {
   private LoadMetadataUtil() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
index 1939095..65ff787 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala
@@ -123,8 +123,7 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) extends Logging {
       Map(("fileheader" -> header)),
       false,
       null,
-      Some(dataFrame),
-      options.useKettle).run(cc)
+      Some(dataFrame)).run(cc)
   }
 
   private def csvPackage: String = "com.databricks.spark.csv.newapi"

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 4baeb67..856e67c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.processing.constants.DataProcessorConstants
 import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.DataLoadResult
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.splits.TableSplit
@@ -165,7 +166,6 @@ class SparkPartitionLoader(model: CarbonLoadModel,
  * @param sc                    The SparkContext to associate the RDD with.
  * @param result                Output result
  * @param carbonLoadModel       Carbon load model which contain the load info
- * @param storeLocation         Tmp store location
  * @param storePath             The store location
  * @param kettleHomePath        The kettle home path
  * @param partitioner           Partitioner which specify how to partition
@@ -182,7 +182,6 @@ class DataFileLoaderRDD[K, V](
     sc: SparkContext,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    var storeLocation: String,
     storePath: String,
     kettleHomePath: String,
     partitioner: Partitioner,
@@ -482,7 +481,6 @@ class DataFileLoaderRDD[K, V](
  * @param sc
  * @param result
  * @param carbonLoadModel
- * @param storeLocation
  * @param storePath
  * @param kettleHomePath
  * @param columinar
@@ -497,7 +495,6 @@ class DataFrameLoaderRDD[K, V](
     sc: SparkContext,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    var storeLocation: String,
     storePath: String,
     kettleHomePath: String,
     columinar: Boolean,

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f97bd43..4392efe 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -27,12 +27,13 @@ import scala.util.Random
 import scala.util.control.Breaks._
 
 import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{util => _, _}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
-import org.apache.spark.sql.hive.{DistributionUtil}
+import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.{FileUtils, SplitUtils}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -47,6 +48,8 @@ import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, Com
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
@@ -660,16 +663,15 @@ object CarbonDataRDDFactory extends Logging {
 
   def loadCarbonData(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storeLocation: String,
       storePath: String,
       kettleHomePath: String,
       partitioner: Partitioner,
       columinar: Boolean,
-      isAgg: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
+      useKettle: Boolean,
       dataFrame: Option[DataFrame] = None): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-
+    val isAgg = false
     // for handling of the segment Merging.
     def handleSegmentMerging(tableCreationTime: Long): Unit = {
       logger
@@ -761,6 +763,11 @@ object CarbonDataRDDFactory extends Logging {
         .audit("Data load request has been received for table " + carbonLoadModel
           .getDatabaseName + "." + carbonLoadModel.getTableName
         )
+      if (!useKettle) {
+        logger.audit("Data is loading with New Data Flow for table " + carbonLoadModel
+            .getDatabaseName + "." + carbonLoadModel.getTableName
+          )
+      }
       // Check if any load need to be deleted before loading new data
       deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, storePath,
         isForceDeletion = false)
@@ -938,7 +945,7 @@ object CarbonDataRDDFactory extends Logging {
             val blockDetailsList =
               entry._2.asScala.map(distributable => {
                 val tableBlock = distributable.asInstanceOf[TableBlockInfo]
-                new BlockDetails(tableBlock.getFilePath,
+                new BlockDetails(new Path(tableBlock.getFilePath),
                   tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
                 )
               }).toArray
@@ -947,20 +954,29 @@ object CarbonDataRDDFactory extends Logging {
           ).toArray
         }
 
-        status = new DataFileLoaderRDD(sqlContext.sparkContext,
-              new DataLoadResultImpl(),
-              carbonLoadModel,
-              storeLocation,
-          storePath,
-              kettleHomePath,
-              partitioner,
-              columinar,
-              currentLoadCount,
-              tableCreationTime,
-              schemaLastUpdatedTime,
-              blocksGroupBy,
-              isTableSplitPartition
-            ).collect()
+        if (useKettle) {
+          status = new DataFileLoaderRDD(sqlContext.sparkContext,
+            new DataLoadResultImpl(),
+            carbonLoadModel,
+            storePath,
+            kettleHomePath,
+            partitioner,
+            columinar,
+            currentLoadCount,
+            tableCreationTime,
+            schemaLastUpdatedTime,
+            blocksGroupBy,
+            isTableSplitPartition
+          ).collect()
+        } else {
+          status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
+            new DataLoadResultImpl(),
+            carbonLoadModel,
+            partitioner,
+            currentLoadCount,
+            blocksGroupBy,
+            isTableSplitPartition).collect()
+        }
       }
 
       def loadDataFrame(): Unit = {
@@ -972,7 +988,6 @@ object CarbonDataRDDFactory extends Logging {
         status = new DataFrameLoaderRDD(sqlContext.sparkContext,
           new DataLoadResultImpl(),
           carbonLoadModel,
-          storeLocation,
           storePath,
           kettleHomePath,
           columinar,
@@ -1028,7 +1043,8 @@ object CarbonDataRDDFactory extends Logging {
           loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
           ex match {
             case sparkException: SparkException =>
-              if (sparkException.getCause.isInstanceOf[DataLoadingException]) {
+              if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
+                  sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
                 executorMessage = sparkException.getCause.getMessage
                 errorMessage = errorMessage + ": " + executorMessage
               }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 8b4b74a..b7da579 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -39,7 +39,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
 import org.apache.carbondata.spark.util.GlobalDictionaryUtil
 import org.apache.carbondata.spark.util.GlobalDictionaryUtil._

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 2bfd99d..b8f4087 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -39,10 +39,11 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
 import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
+import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.QueryPlanUtil
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
new file mode 100644
index 0000000..6affe66
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
+import org.apache.carbondata.processing.model.CarbonLoadModel
+import org.apache.carbondata.processing.newflow.DataLoadExecutor
+import org.apache.carbondata.processing.newflow.exception.BadRecordFoundException
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+    sc: SparkContext,
+    result: DataLoadResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    partitioner: Partitioner,
+    loadCount: Integer,
+    blocksGroupBy: Array[(String, Array[BlockDetails])],
+    isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging {
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
+  private val confBroadcast =
+    sc.broadcast(new CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+    if (isTableSplitPartition) {
+      // for table split partition
+      var splits: Array[TableSplit] = null
+
+      if (carbonLoadModel.isDirectLoad) {
+        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+          partitioner.nodeList, partitioner.partitionCount)
+      } else {
+        splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+          carbonLoadModel.getTableName, null, partitioner)
+      }
+
+      splits.zipWithIndex.map { s =>
+        // filter the same partition unique id, because only one will match, so get 0 element
+        val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
+          p._1 == s._1.getPartition.getUniqueID)(0)._2
+        new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
+      }
+    } else {
+      // for node partition
+      blocksGroupBy.zipWithIndex.map { b =>
+        new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
+      }
+    }
+  }
+
+  override def checkpoint() {
+    // Do nothing. Hadoop RDD should not be checkpointed.
+  }
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val iter = new Iterator[(K, V)] {
+      var partitionID = "0"
+      val loadMetadataDetails = new LoadMetadataDetails()
+      var model: CarbonLoadModel = _
+      var uniqueLoadStatusId =
+        carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
+      try {
+        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+
+        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+        val recordReaders = getInputIterators
+        val loader = new SparkPartitionLoader(model,
+          theSplit.index,
+          null,
+          null,
+          loadCount,
+          loadMetadataDetails)
+        // Intialize to set carbon properties
+        loader.initialize()
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        new DataLoadExecutor().execute(model,
+          loader.storeLocation,
+          recordReaders)
+      } catch {
+        case e: BadRecordFoundException =>
+          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+          logInfo("Bad Record Found")
+        case e: Exception =>
+          logInfo("DataLoad failure", e)
+          LOGGER.error(e)
+          throw e
+      }
+
+      def getInputIterators: Array[util.Iterator[Array[AnyRef]]] = {
+        val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, theSplit.index, 0)
+        val configuration: Configuration = confBroadcast.value.value
+        configureCSVInputFormat(configuration)
+        val hadoopAttemptContext = newTaskAttemptContext(configuration, attemptId)
+        val format = new CSVInputFormat
+        if (isTableSplitPartition) {
+          // for table split partition
+          val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
+          logInfo("Input split: " + split.serializableHadoopSplit.value)
+          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+          if (carbonLoadModel.isDirectLoad) {
+            model = carbonLoadModel.getCopyWithPartition(
+                split.serializableHadoopSplit.value.getPartition.getUniqueID,
+                split.serializableHadoopSplit.value.getPartition.getFilesPath,
+                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+          } else {
+            model = carbonLoadModel.getCopyWithPartition(
+                split.serializableHadoopSplit.value.getPartition.getUniqueID)
+          }
+          partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
+
+          StandardLogService.setThreadName(partitionID, null)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
+              partitionID, split.partitionBlocksDetail.length)
+          val readers =
+          split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
+          readers.zipWithIndex.foreach { case (reader, index) =>
+            reader.initialize(split.partitionBlocksDetail(index), hadoopAttemptContext)
+          }
+          readers.map(new RecordReaderIterator(_))
+        } else {
+          // for node partition
+          val split = theSplit.asInstanceOf[CarbonNodePartition]
+          logInfo("Input split: " + split.serializableHadoopSplit)
+          logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
+              split.serializableHadoopSplit, split.nodeBlocksDetail.length)
+          val blocksID = gernerateBlocksID
+          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+          if (carbonLoadModel.isDirectLoad) {
+            val filelist: java.util.List[String] = new java.util.ArrayList[String](
+                CarbonCommonConstants.CONSTANT_SIZE_TEN)
+            CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
+            model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
+                carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+          } else {
+            model = carbonLoadModel.getCopyWithPartition(partitionID)
+          }
+          StandardLogService.setThreadName(blocksID, null)
+          val readers =
+            split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
+          readers.zipWithIndex.foreach { case (reader, index) =>
+            reader.initialize(split.nodeBlocksDetail(index), hadoopAttemptContext)
+          }
+          readers.map(new RecordReaderIterator(_))
+        }
+      }
+
+      def configureCSVInputFormat(configuration: Configuration): Unit = {
+        CSVInputFormat.setCommentCharacter(carbonLoadModel.getCommentChar, configuration)
+        CSVInputFormat.setCSVDelimiter(carbonLoadModel.getCsvDelimiter, configuration)
+        CSVInputFormat.setEscapeCharacter(carbonLoadModel.getEscapeChar, configuration)
+        CSVInputFormat.setHeaderExtractionEnabled(
+          carbonLoadModel.getCsvHeader == null || carbonLoadModel.getCsvHeader.isEmpty,
+          configuration)
+        CSVInputFormat.setQuoteCharacter(carbonLoadModel.getQuoteChar, configuration)
+      }
+
+      /**
+       * generate blocks id
+       *
+       * @return
+       */
+      def gernerateBlocksID: String = {
+        if (isTableSplitPartition) {
+          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+          theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
+            .getPartition.getUniqueID + "_" + UUID.randomUUID()
+        } else {
+          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
+          UUID.randomUUID()
+        }
+      }
+
+      var finished = false
+
+      override def hasNext: Boolean = {
+        !finished
+      }
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+      }
+    }
+    iter
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    isTableSplitPartition match {
+      case true =>
+        // for table split partition
+        val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
+        val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
+        location
+      case false =>
+        // for node partition
+        val theSplit = split.asInstanceOf[CarbonNodePartition]
+        val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
+        logInfo("Preferred Location for split : " + firstOptionLocation.head)
+        val blockMap = new util.LinkedHashMap[String, Integer]()
+        val tableBlocks = theSplit.blocksDetails
+        tableBlocks.foreach { tableBlock =>
+          tableBlock.getLocations.foreach { location =>
+            if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) {
+              val currentCount = blockMap.get(location)
+              if (currentCount == null) {
+                blockMap.put(location, 1)
+              } else {
+                blockMap.put(location, currentCount + 1)
+              }
+            }
+          }
+        }
+
+        val sortedList = blockMap.entrySet().asScala.toSeq.sortWith {(nodeCount1, nodeCount2) =>
+          nodeCount1.getValue > nodeCount2.getValue
+        }
+
+        val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
+        firstOptionLocation ++ sortedNodesList
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index d14eedf..bd295bc 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -46,9 +46,9 @@ import org.apache.carbondata.core.reader.CarbonDictionaryReader
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter
 import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.load.CarbonLoadModel
 import org.apache.carbondata.spark.rdd._
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala b/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
new file mode 100644
index 0000000..84f398a
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/mapred/SparkMapRedUtil.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mapred
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.util.SerializableConfiguration
+
+/**
+ * It is just dummy class to use Sparks package restricted SparkHadoopMapReduceUtil.
+ */
+trait CarbonHadoopMapReduceUtil extends SparkHadoopMapReduceUtil {
+
+}
+
+class CarbonSerializableConfiguration(@transient var conf: Configuration)
+  extends SerializableConfiguration(conf)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 02912fd..6f149f7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -969,9 +969,8 @@ class CarbonSqlParser()
             validateOptions(optionsList)
           }
           val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
-          val useKettle = optionsMap.getOrElse("USE_KETTLE", "true").toBoolean
           LoadTable(databaseNameOp, tableName, filePath, Seq(), optionsMap,
-            isOverwrite.isDefined, useKettle = useKettle)
+            isOverwrite.isDefined)
       }
 
   private def validateOptions(optionList: Option[List[(String, String)]]): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index f285984..ed757e3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -56,6 +56,7 @@ import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.constants.TableOptionConstant
 import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.load._
@@ -1026,33 +1027,6 @@ case class LoadTable(
     options: scala.collection.immutable.Map[String, String],
     isOverwriteExist: Boolean = false,
     var inputSqlString: String = null,
-    dataFrame: Option[DataFrame] = None,
-    useKettle: Boolean = true) extends RunnableCommand {
-
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    if (useKettle) {
-      LoadTableUsingKettle(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
-        options, isOverwriteExist, inputSqlString, dataFrame).run(sqlContext)
-    } else {
-      LoadTableUsingProcessorStep().run(sqlContext)
-    }
-  }
-}
-
-case class LoadTableUsingProcessorStep() extends RunnableCommand {
-  def run(sqlContext: SQLContext): Seq[Row] = {
-    throw new UnsupportedOperationException("work in progress")
-  }
-}
-
-case class LoadTableUsingKettle(
-    databaseNameOp: Option[String],
-    tableName: String,
-    factPathFromUser: String,
-    dimFilesPath: Seq[DataLoadTableFileMapping],
-    options: scala.collection.immutable.Map[String, String],
-    isOverwriteExist: Boolean = false,
-    var inputSqlString: String = null,
     dataFrame: Option[DataFrame] = None) extends RunnableCommand {
 
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -1112,24 +1086,28 @@ case class LoadTableUsingKettle(
       val dataLoadSchema = new CarbonDataLoadSchema(table)
       // Need to fill dimension relation
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-      var storeLocation = ""
       val configuredStore = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != configuredStore && configuredStore.nonEmpty) {
-        storeLocation = configuredStore(Random.nextInt(configuredStore.length))
-      }
-      if (storeLocation == null) {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
 
       var partitionLocation = relation.tableMeta.storePath + "/partition/" +
                               relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
                               relation.tableMeta.carbonTableIdentifier.getTableName + "/"
 
-      storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
       val columinar = sqlContext.getConf("carbon.is.columnar.storage", "true").toBoolean
       val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
 
+      // TODO It will be removed after kettle is removed.
+      val useKettle = options.get("use_kettle") match {
+        case Some(value) => value.toBoolean
+        case _ =>
+          val useKettleLocal = System.getProperty("use.kettle")
+          if (useKettleLocal == null) {
+            sqlContext.sparkContext.getConf.get("use_kettle_default", "true").toBoolean
+          } else {
+            useKettleLocal.toBoolean
+          }
+      }
+
       val delimiter = options.getOrElse("delimiter", ",")
       val quoteChar = options.getOrElse("quotechar", "\"")
       val fileHeader = options.getOrElse("fileheader", "")
@@ -1198,10 +1176,15 @@ case class LoadTableUsingKettle(
         GlobalDictionaryUtil
           .generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
             dataFrame)
-        CarbonDataRDDFactory
-          .loadCarbonData(sqlContext, carbonLoadModel, storeLocation, relation.tableMeta.storePath,
+        CarbonDataRDDFactory.loadCarbonData(sqlContext,
+            carbonLoadModel,
+            relation.tableMeta.storePath,
             kettleHomePath,
-            relation.tableMeta.partitioner, columinar, isAgg = false, partitionStatus, dataFrame)
+            relation.tableMeta.partitioner,
+            columinar,
+            partitionStatus,
+            useKettle,
+            dataFrame)
       }
       catch {
         case ex: Exception =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 03ddeff..a755d5d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -228,12 +228,12 @@ class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] {
       case ShowLoadsCommand(databaseName, table, limit) =>
         ExecutedCommand(ShowLoads(databaseName, table, limit, plan.output)) :: Nil
       case LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
-      options, isOverwriteExist, inputSqlString, dataFrame, useKettle) =>
+      options, isOverwriteExist, inputSqlString, dataFrame) =>
         val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
             .tableExists(TableIdentifier(tableName, databaseNameOp))(sqlContext)
         if (isCarbonTable || options.nonEmpty) {
           ExecutedCommand(LoadTable(databaseNameOp, tableName, factPathFromUser, dimFilesPath,
-            options, isOverwriteExist, inputSqlString, dataFrame, useKettle)) :: Nil
+            options, isOverwriteExist, inputSqlString, dataFrame)) :: Nil
         } else {
           ExecutedCommand(HiveNativeCommand(inputSqlString)) :: Nil
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
index 22713da..03e15c1 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/SplitUtils.scala
@@ -56,7 +56,7 @@ object SplitUtils {
         part.asInstanceOf[NewHadoopPartition].serializableHadoopSplit.value.asInstanceOf[FileSplit]
       }
       splits.map { block =>
-        new BlockDetails(block.getPath.toString,
+        new BlockDetails(block.getPath,
           block.getStart,
           block.getLength,
           block.getLocations

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
index 0c039bd..fa8731a 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
@@ -24,7 +24,7 @@ import java.io.File
 import org.apache.spark.sql.common.util.{CarbonHiveContext, QueryTest}
 import org.apache.spark.util.FileUtils
 
-import org.apache.carbondata.spark.load.CarbonLoadModel
+import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.util.GlobalDictionaryUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 2b19829..c215a46 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -23,11 +23,13 @@ import java.io.File
 import org.apache.spark.sql.common.util.CarbonHiveContext.sql
 import org.apache.spark.sql.common.util.{CarbonHiveContext, QueryTest}
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
+
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.spark.load.CarbonLoadModel
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.processing.model.CarbonLoadModel
+
 /**
   * Test Case for org.apache.carbondata.integration.spark.util.GlobalDictionaryUtil
   *

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
index e7a549c..3b06311 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/AutoHighCardinalityIdentifyTestCase.scala
@@ -25,13 +25,14 @@ import org.apache.spark.sql.common.util.CarbonHiveContext.sql
 import org.apache.spark.sql.common.util.{CarbonHiveContext, QueryTest}
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.spark.load.CarbonLoadModel
+import org.apache.carbondata.processing.model.CarbonLoadModel
 
 /**
   * Test Case for org.apache.carbondata.spark.util.GlobalDictionaryUtil

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 97bcb64..07d1a16 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -24,13 +24,14 @@ import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.CarbonLoadModel
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation}
 import org.apache.spark.sql.common.util.CarbonHiveContext
 import org.apache.spark.sql.common.util.CarbonHiveContext.sql
 import org.apache.spark.sql.common.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.processing.model.CarbonLoadModel
+
   /**
  * test case for external column dictionary generation
   * also support complicated type

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
index 179d0f6..9cbdb7e 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtilConcurrentTestCase.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.common.util.CarbonHiveContext.sql
 import org.apache.spark.sql.common.util.QueryTest
 
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema
-import org.apache.carbondata.spark.load.CarbonLoadModel
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
@@ -39,11 +38,11 @@ import java.util.concurrent.Callable
 import java.util.concurrent.TimeUnit
 
 import org.apache.carbondata.common.ext.PathFactory
-
 import org.apache.carbondata.core.carbon.path.CarbonTablePath
 import org.apache.carbondata.core.carbon.ColumnIdentifier
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.processing.model.CarbonLoadModel
 
 class GlobalDictionaryUtilConcurrentTestCase extends QueryTest with BeforeAndAfterAll {