You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/10 14:49:24 UTC

[2/5] incubator-carbondata git commit: Data load integration of all steps for removing kettle

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index 78b9290..e0a7ceb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -30,10 +30,12 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
 import org.apache.carbondata.processing.newflow.converter.FieldConverter;
 import org.apache.carbondata.processing.newflow.converter.RowConverter;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
 
 /**
  * It converts the complete row if necessary, dictionary columns are encoded with dictionary values
@@ -43,14 +45,30 @@ public class RowConverterImpl implements RowConverter {
 
   private CarbonDataLoadConfiguration configuration;
 
+  private DataField[] fields;
+
   private FieldConverter[] fieldConverters;
 
-  public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration) {
+  private BadRecordsLogger badRecordLogger;
+
+  private BadRecordLogHolder logHolder;
+
+  public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration,
+      BadRecordsLogger badRecordLogger) {
+    this.fields = fields;
     this.configuration = configuration;
+    this.badRecordLogger = badRecordLogger;
+  }
+
+  @Override
+  public void initialize() {
     CacheProvider cacheProvider = CacheProvider.getInstance();
     Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache =
         cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
             configuration.getTableIdentifier().getStorePath());
+    String nullFormat =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+            .toString();
     List<FieldConverter> fieldConverterList = new ArrayList<>();
 
     long lruCacheStartTime = System.currentTimeMillis();
@@ -58,20 +76,27 @@ public class RowConverterImpl implements RowConverter {
     for (int i = 0; i < fields.length; i++) {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], cache,
-              configuration.getTableIdentifier().getCarbonTableIdentifier(), i);
-      if (fieldConverter != null) {
-        fieldConverterList.add(fieldConverter);
-      }
+              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat);
+      fieldConverterList.add(fieldConverter);
     }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordLruCacheLoadTime((System.currentTimeMillis() - lruCacheStartTime) / 1000.0);
     fieldConverters = fieldConverterList.toArray(new FieldConverter[fieldConverterList.size()]);
+    logHolder = new BadRecordLogHolder();
   }
 
   @Override
   public CarbonRow convert(CarbonRow row) throws CarbonDataLoadingException {
+    CarbonRow copy = row.getCopy();
     for (int i = 0; i < fieldConverters.length; i++) {
-      fieldConverters[i].convert(row);
+      fieldConverters[i].convert(row, logHolder);
+      if (logHolder.isBadRecordNotAdded()) {
+        badRecordLogger.addBadRecordsToBuilder(copy.getData(), logHolder.getReason());
+        logHolder.clear();
+        if(badRecordLogger.isBadRecordConvertNullDisable()) {
+          return null;
+        }
+      }
     }
     return row;
   }
@@ -81,8 +106,8 @@ public class RowConverterImpl implements RowConverter {
     List<Integer> dimCardinality = new ArrayList<>();
     for (int i = 0; i < fieldConverters.length; i++) {
       if (fieldConverters[i] instanceof AbstractDictionaryFieldConverterImpl) {
-        dimCardinality.add(
-            ((AbstractDictionaryFieldConverterImpl) fieldConverters[i]).getColumnCardinality());
+        ((AbstractDictionaryFieldConverterImpl) fieldConverters[i])
+            .fillColumnCardinality(dimCardinality);
       }
     }
     int[] cardinality = new int[dimCardinality.size()];
@@ -93,4 +118,13 @@ public class RowConverterImpl implements RowConverter {
     configuration.setDataLoadProperty(DataLoadProcessorConstants.DIMENSION_LENGTHS, cardinality);
   }
 
+  @Override
+  public RowConverter createCopyForNewThread() {
+    RowConverterImpl converter =
+        new RowConverterImpl(this.fields, this.configuration, this.badRecordLogger);
+    converter.fieldConverters = fieldConverters;
+    converter.logHolder = new BadRecordLogHolder();
+    return converter;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
new file mode 100644
index 0000000..0c15c4b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DirectDictionary.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.dictionary;
+
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+
+/**
+ * It is used for generating dictionary from value itself, like timestamp can be used directly as
+ * dictionary.
+ */
+public class DirectDictionary implements BiDictionary<Integer, Object> {
+
+  private DirectDictionaryGenerator dictionaryGenerator;
+
+  public DirectDictionary(DirectDictionaryGenerator dictionaryGenerator) {
+    this.dictionaryGenerator = dictionaryGenerator;
+  }
+
+  @Override
+  public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
+    Integer key = getKey(value);
+    if (key == null) {
+      throw new UnsupportedOperationException("trying to add new entry in DirectDictionary");
+    }
+    return key;
+  }
+
+  @Override
+  public Integer getKey(Object value) {
+    return dictionaryGenerator.generateDirectSurrogateKey(value.toString());
+  }
+
+  @Override
+  public Object getValue(Integer key) {
+    return dictionaryGenerator.getValueFromSurrogate(key);
+  }
+
+  @Override public int size() {
+    return Integer.MAX_VALUE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
index f807a81..3f0a9f0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/PreCreatedDictionary.java
@@ -23,7 +23,7 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 
-public class PreCreatedDictionary implements BiDictionary<Integer, String> {
+public class PreCreatedDictionary implements BiDictionary<Integer, Object> {
 
   private Dictionary dictionary;
 
@@ -32,7 +32,7 @@ public class PreCreatedDictionary implements BiDictionary<Integer, String> {
   }
 
   @Override
-  public Integer getOrGenerateKey(String value) throws DictionaryGenerationException {
+  public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException {
     Integer key = getKey(value);
     if (key == null) {
       throw new UnsupportedOperationException("trying to add new entry in PreCreatedDictionary");
@@ -41,8 +41,8 @@ public class PreCreatedDictionary implements BiDictionary<Integer, String> {
   }
 
   @Override
-  public Integer getKey(String value) {
-    return dictionary.getSurrogateKey(value);
+  public Integer getKey(Object value) {
+    return dictionary.getSurrogateKey(value.toString());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
new file mode 100644
index 0000000..7c1126e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/BadRecordFoundException.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.exception;
+
+public class BadRecordFoundException extends Exception {
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public BadRecordFoundException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public BadRecordFoundException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public BadRecordFoundException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
index fcf122e..a0139f5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/CarbonParserFactory.java
@@ -30,42 +30,47 @@ public final class CarbonParserFactory {
 
   /**
    * Create parser for the carbon column.
+   *
    * @param carbonColumn
    * @param complexDelimiters
    * @return
    */
-  public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters) {
-    return createParser(carbonColumn, complexDelimiters, 0);
+  public static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
+      String nullFormat) {
+    return createParser(carbonColumn, complexDelimiters, nullFormat, 0);
   }
 
   /**
    * This method may be called recursively if the carbon column is complex type.
+   *
    * @param carbonColumn
    * @param complexDelimiters, these delimiters which are used to separate the complex data types.
-   * @param depth It is like depth of tree, if column has children then depth is 1, And depth
-   *              becomes 2 if children has children. This depth is used select the complex
-   *              delimiters
+   * @param depth              It is like depth of tree, if column has children then depth is 1,
+   *                           And depth becomes 2 if children has children.
+   *                           This depth is used select the complex
+   *                           delimiters
    * @return GenericParser
    */
   private static GenericParser createParser(CarbonColumn carbonColumn, String[] complexDelimiters,
-      int depth) {
+      String nullFormat, int depth) {
     switch (carbonColumn.getDataType()) {
       case ARRAY:
         List<CarbonDimension> listOfChildDimensions =
             ((CarbonDimension) carbonColumn).getListOfChildDimensions();
         // Create array parser with complex delimiter
-        ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth]);
+        ArrayParserImpl arrayParser = new ArrayParserImpl(complexDelimiters[depth], nullFormat);
         for (CarbonDimension dimension : listOfChildDimensions) {
-          arrayParser.addChildren(createParser(dimension, complexDelimiters, depth + 1));
+          arrayParser
+              .addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
         }
         return arrayParser;
       case STRUCT:
         List<CarbonDimension> dimensions =
             ((CarbonDimension) carbonColumn).getListOfChildDimensions();
         // Create struct parser with complex delimiter
-        StructParserImpl parser = new StructParserImpl(complexDelimiters[depth]);
+        StructParserImpl parser = new StructParserImpl(complexDelimiters[depth], nullFormat);
         for (CarbonDimension dimension : dimensions) {
-          parser.addChildren(createParser(dimension, complexDelimiters, depth + 1));
+          parser.addChildren(createParser(dimension, complexDelimiters, nullFormat, depth + 1));
         }
         return parser;
       case MAP:

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
index bdc13ab..e500bf9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/GenericParser.java
@@ -22,7 +22,7 @@ package org.apache.carbondata.processing.newflow.parser;
  * Parse the data according to implementation, The implementation classes can be struct, array or
  * map datatypes.
  * It remains thread safe as the state of implementation class should not change while
- * calling @{@link GenericParser#parse(String)} method
+ * calling @{@link GenericParser#parse(Object)} method
  */
 public interface GenericParser<E> {
 
@@ -31,6 +31,6 @@ public interface GenericParser<E> {
    * @param data
    * @return
    */
-  E parse(String data);
+  E parse(Object data);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
index 7fc95e7..93ee4d4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/ArrayParserImpl.java
@@ -18,8 +18,6 @@
  */
 package org.apache.carbondata.processing.newflow.parser.impl;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.regex.Pattern;
 
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -28,33 +26,38 @@ import org.apache.carbondata.processing.newflow.parser.ComplexParser;
 import org.apache.carbondata.processing.newflow.parser.GenericParser;
 
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
 
 /**
  * It parses the string to @{@link ArrayObject} using delimiter.
  * It is thread safe as the state of class don't change while
- * calling @{@link GenericParser#parse(String)} method
+ * calling @{@link GenericParser#parse(Object)} method
  */
 public class ArrayParserImpl implements ComplexParser<ArrayObject> {
 
   private Pattern pattern;
 
-  private List<GenericParser> children = new ArrayList<>();
+  private GenericParser child;
 
-  public ArrayParserImpl(String delimiter) {
+  private String nullFormat;
+
+  public ArrayParserImpl(String delimiter, String nullFormat) {
     pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
+    this.nullFormat = nullFormat;
   }
 
   @Override
-  public ArrayObject parse(String data) {
-    if (StringUtils.isNotEmpty(data)) {
-      String[] split = pattern.split(data, -1);
-      if (ArrayUtils.isNotEmpty(split)) {
-        Object[] array = new Object[children.size()];
-        for (int i = 0; i < children.size(); i++) {
-          array[i] = children.get(i).parse(split[i]);
+  public ArrayObject parse(Object data) {
+    if (data != null) {
+      String value = data.toString();
+      if (!value.isEmpty() && !value.equals(nullFormat)) {
+        String[] split = pattern.split(value, -1);
+        if (ArrayUtils.isNotEmpty(split)) {
+          Object[] array = new Object[split.length];
+          for (int i = 0; i < split.length; i++) {
+            array[i] = child.parse(split[i]);
+          }
+          return new ArrayObject(array);
         }
-        return new ArrayObject(array);
       }
     }
     return null;
@@ -62,6 +65,6 @@ public class ArrayParserImpl implements ComplexParser<ArrayObject> {
 
   @Override
   public void addChildren(GenericParser parser) {
-    children.add(parser);
+    child = parser;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
index a7c08b5..a9940ae 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/PrimitiveParserImpl.java
@@ -23,7 +23,7 @@ import org.apache.carbondata.processing.newflow.parser.GenericParser;
 public class PrimitiveParserImpl implements GenericParser<Object> {
 
   @Override
-  public Object parse(String data) {
+  public Object parse(Object data) {
     return data;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
index 9c1edd1..c438f89 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
@@ -18,6 +18,10 @@
  */
 package org.apache.carbondata.processing.newflow.parser.impl;
 
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
 import org.apache.carbondata.processing.newflow.parser.GenericParser;
 import org.apache.carbondata.processing.newflow.parser.RowParser;
 
@@ -25,15 +29,69 @@ public class RowParserImpl implements RowParser {
 
   private GenericParser[] genericParsers;
 
-  public RowParserImpl(GenericParser[] genericParsers) {
-    this.genericParsers = genericParsers;
+  private int[] outputMapping;
+
+  private int[] inputMapping;
+
+  private int numberOfColumns;
+
+  public RowParserImpl(DataField[] output, CarbonDataLoadConfiguration configuration) {
+    String[] complexDelimiters =
+        (String[]) configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS);
+    String nullFormat =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+            .toString();
+    DataField[] input = getInput(configuration);
+    genericParsers = new GenericParser[input.length];
+    for (int i = 0; i < genericParsers.length; i++) {
+      genericParsers[i] =
+          CarbonParserFactory.createParser(input[i].getColumn(), complexDelimiters, nullFormat);
+    }
+    outputMapping = new int[output.length];
+    for (int i = 0; i < input.length; i++) {
+      for (int j = 0; j < output.length; j++) {
+        if (input[i].getColumn().equals(output[j].getColumn())) {
+          outputMapping[i] = j;
+          break;
+        }
+      }
+    }
+  }
+
+  public DataField[] getInput(CarbonDataLoadConfiguration configuration) {
+    DataField[] fields = configuration.getDataFields();
+    String[] header = configuration.getHeader();
+    numberOfColumns = header.length;
+    DataField[] input = new DataField[fields.length];
+    inputMapping = new int[input.length];
+    int k = 0;
+    for (int i = 0; i < numberOfColumns; i++) {
+      for (int j = 0; j < fields.length; j++) {
+        if (header[i].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+          input[k] = fields[j];
+          inputMapping[k] = i;
+          k++;
+          break;
+        }
+      }
+    }
+    return input;
   }
 
   @Override
   public Object[] parseRow(Object[] row) {
-    for (int i = 0; i < row.length; i++) {
-      row[i] = genericParsers[i].parse(row[i].toString());
+    // If number of columns are less in a row then create new array with same size of header.
+    if (row.length < numberOfColumns) {
+      String[] temp = new String[numberOfColumns];
+      System.arraycopy(row, 0, temp, 0, row.length);
+      row = temp;
     }
-    return row;
+    Object[] out = new Object[genericParsers.length];
+    for (int i = 0; i < genericParsers.length; i++) {
+      Object obj = row[inputMapping[i]];
+      out[outputMapping[i]] = genericParsers[i].parse(obj);
+    }
+    return out;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
index b38a2e4..e83c079 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/StructParserImpl.java
@@ -28,12 +28,11 @@ import org.apache.carbondata.processing.newflow.parser.ComplexParser;
 import org.apache.carbondata.processing.newflow.parser.GenericParser;
 
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
 
 /**
  * It parses the string to @{@link StructObject} using delimiter.
  * It is thread safe as the state of class don't change while
- * calling @{@link GenericParser#parse(String)} method
+ * calling @{@link GenericParser#parse(Object)} method
  */
 public class StructParserImpl implements ComplexParser<StructObject> {
 
@@ -41,20 +40,26 @@ public class StructParserImpl implements ComplexParser<StructObject> {
 
   private List<GenericParser> children = new ArrayList<>();
 
-  public StructParserImpl(String delimiter) {
+  private String nullFormat;
+
+  public StructParserImpl(String delimiter, String nullFormat) {
     pattern = Pattern.compile(CarbonUtil.delimiterConverter(delimiter));
+    this.nullFormat = nullFormat;
   }
 
   @Override
-  public StructObject parse(String data) {
-    if (StringUtils.isNotEmpty(data)) {
-      String[] split = pattern.split(data, -1);
-      if (ArrayUtils.isNotEmpty(split)) {
-        Object[] array = new Object[children.size()];
-        for (int i = 0; i < children.size(); i++) {
-          array[i] = children.get(i).parse(split[i]);
+  public StructObject parse(Object data) {
+    if (data != null) {
+      String value = data.toString();
+      if (!value.isEmpty() && !value.equals(nullFormat)) {
+        String[] split = pattern.split(value, -1);
+        if (ArrayUtils.isNotEmpty(split)) {
+          Object[] array = new Object[children.size()];
+          for (int i = 0; i < split.length && i < array.length; i++) {
+            array[i] = children.get(i).parse(split[i]);
+          }
+          return new StructObject(array);
         }
-        return new StructObject(array);
       }
     }
     return null;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
index 83b07c9..68b87a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java
@@ -20,6 +20,7 @@
 package org.apache.carbondata.processing.newflow.row;
 
 import java.math.BigDecimal;
+import java.util.Arrays;
 
 /**
  * This row class is used to transfer the row data from one step to other step
@@ -36,6 +37,10 @@ public class CarbonRow {
     return data;
   }
 
+  public void setData(Object[] data) {
+    this.data = data;
+  }
+
   public int getInt(int ordinal) {
     return (int) data[ordinal];
   }
@@ -72,12 +77,21 @@ public class CarbonRow {
     return (Object[]) data[ordinal];
   }
 
-  public Integer[] getIntegerArray(int ordinal) {
-    return (Integer[]) data[ordinal];
+  public int[] getIntArray(int ordinal) {
+    return (int[]) data[ordinal];
   }
 
   public void update(Object value, int ordinal) {
     data[ordinal] = value;
   }
 
+  public CarbonRow getCopy() {
+    Object[] copy = new Object[data.length];
+    System.arraycopy(data, 0, copy, 0, copy.length);
+    return new CarbonRow(copy);
+  }
+
+  @Override public String toString() {
+    return Arrays.toString(data);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 0e575bc..cd487ec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -81,10 +81,10 @@ public class ParallelReadMergeSorterImpl implements Sorter {
         storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
     finalMerger =
         new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(),
-            sortParameters.getDimColCount() - sortParameters.getComplexDimColCount(),
+            sortParameters.getDimColCount(),
             sortParameters.getComplexDimColCount(), sortParameters.getMeasureColCount(),
             sortParameters.getNoDictionaryCount(), sortParameters.getAggType(),
-            sortParameters.getNoDictionaryDimnesionColumn());
+            sortParameters.getNoDictionaryDimnesionColumn(), sortParameters.isUseKettle());
   }
 
   @Override
@@ -102,21 +102,14 @@ public class ParallelReadMergeSorterImpl implements Sorter {
     }
     this.executorService = Executors.newFixedThreadPool(iterators.length);
 
-    // First prepare the data for sort.
-    Iterator<CarbonRowBatch>[] sortPrepIterators = new Iterator[iterators.length];
-    for (int i = 0; i < sortPrepIterators.length; i++) {
-      sortPrepIterators[i] = new SortPreparatorIterator(iterators[i], inputDataFields);
-    }
-
-    for (int i = 0; i < sortDataRows.length; i++) {
-      executorService
-          .submit(new SortIteratorThread(sortPrepIterators[i], sortDataRows[i], sortParameters));
-    }
-
     try {
+      for (int i = 0; i < sortDataRows.length; i++) {
+        executorService.submit(
+            new SortIteratorThread(iterators[i], sortDataRows[i], sortParameters));
+      }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
-    } catch (InterruptedException e) {
+    } catch (Exception e) {
       throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
     }
     try {
@@ -182,12 +175,15 @@ public class ParallelReadMergeSorterImpl implements Sorter {
           CarbonRowBatch batch = iterator.next();
           Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
           while (batchIterator.hasNext()) {
-            sortDataRows.addRow(batchIterator.next().getData());
+            CarbonRow row = batchIterator.next();
+            if (row != null) {
+              sortDataRows.addRow(row.getData());
+            }
           }
         }
 
         processRowToNextStep(sortDataRows);
-      } catch (CarbonSortKeyAndGroupByException e) {
+      } catch (Exception e) {
         LOGGER.error(e);
         throw new CarbonDataLoadingException(e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java
deleted file mode 100644
index 9c4305a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/SortPreparatorIterator.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.processing.newflow.sort.impl;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.constants.IgnoreDictionary;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
-
-import org.apache.commons.lang3.ArrayUtils;
-
-/**
- * This iterator transform the row to how carbon sorter interface expects it.
- * TODO : It supposed to return a comparable ROW which can sort the row.
- */
-public class SortPreparatorIterator extends CarbonIterator<CarbonRowBatch> {
-
-  private Iterator<CarbonRowBatch> iterator;
-
-  private int[] dictionaryFieldIndexes;
-
-  private int[] nonDictionaryFieldIndexes;
-
-  private int[] measueFieldIndexes;
-
-  private int dimIndexInRow = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
-
-  private int byteArrayIndexInRow = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
-
-  private int measureIndexInRow = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
-
-  public SortPreparatorIterator(Iterator<CarbonRowBatch> iterator, DataField[] dataFields) {
-    this.iterator = iterator;
-    List<Integer> dictIndexes = new ArrayList<>();
-    List<Integer> nonDictIndexes = new ArrayList<>();
-    List<Integer> msrIndexes = new ArrayList<>();
-    for (int i = 0; i < dataFields.length; i++) {
-      if (dataFields[i].getColumn().isDimesion()) {
-        if (dataFields[i].hasDictionaryEncoding()) {
-          dictIndexes.add(i);
-        } else {
-          nonDictIndexes.add(i);
-        }
-      } else {
-        msrIndexes.add(i);
-      }
-    }
-    dictionaryFieldIndexes =
-        ArrayUtils.toPrimitive(dictIndexes.toArray(new Integer[dictIndexes.size()]));
-    nonDictionaryFieldIndexes =
-        ArrayUtils.toPrimitive(nonDictIndexes.toArray(new Integer[nonDictIndexes.size()]));
-    measueFieldIndexes = ArrayUtils.toPrimitive(msrIndexes.toArray(new Integer[msrIndexes.size()]));
-  }
-
-  @Override
-  public boolean hasNext() {
-    return iterator.hasNext();
-  }
-
-  @Override
-  public CarbonRowBatch next() {
-    CarbonRowBatch batch = iterator.next();
-    Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
-    while (batchIterator.hasNext()) {
-      Object[] outputArray = new Object[3];
-      CarbonRow row = batchIterator.next();
-      fillDictionaryArrayFromRow(row, outputArray);
-      fillNonDictionaryArrayFromRow(row, outputArray);
-      fillMeasureArrayFromRow(row, outputArray);
-    }
-    return batch;
-  }
-
-  /**
-   * Collect all dictionary values to single integer array and store it in 0 index of out put array.
-   *
-   * @param row
-   * @param outputArray
-   */
-  private void fillDictionaryArrayFromRow(CarbonRow row, Object[] outputArray) {
-    if (dictionaryFieldIndexes.length > 0) {
-      int[] dictArray = new int[dictionaryFieldIndexes.length];
-      for (int i = 0; i < dictionaryFieldIndexes.length; i++) {
-        dictArray[i] = row.getInt(dictionaryFieldIndexes[i]);
-      }
-      outputArray[dimIndexInRow] = dictArray;
-    }
-  }
-
-  /**
-   * collect all non dictionary columns and compose it to single byte array and store it in 1 index
-   * of out put array
-   *
-   * @param row
-   * @param outputArray
-   */
-  private void fillNonDictionaryArrayFromRow(CarbonRow row, Object[] outputArray) {
-    if (nonDictionaryFieldIndexes.length > 0) {
-      byte[][] nonDictByteArray = new byte[nonDictionaryFieldIndexes.length][];
-      for (int i = 0; i < nonDictByteArray.length; i++) {
-        nonDictByteArray[i] = row.getBinary(nonDictionaryFieldIndexes[i]);
-      }
-
-      byte[] nonDictionaryCols =
-          RemoveDictionaryUtil.packByteBufferIntoSingleByteArray(nonDictByteArray);
-      outputArray[byteArrayIndexInRow] = nonDictionaryCols;
-    }
-  }
-
-  /**
-   * Collect all measure values as array and store it in 2 index of out put array.
-   *
-   * @param row
-   * @param outputArray
-   */
-  private void fillMeasureArrayFromRow(CarbonRow row, Object[] outputArray) {
-    if (measueFieldIndexes.length > 0) {
-      Object[] measureArray = new Object[measueFieldIndexes.length];
-      for (int i = 0; i < measueFieldIndexes.length; i++) {
-        measureArray[i] = row.getObject(measueFieldIndexes[i]);
-      }
-      outputArray[measureIndexInRow] = measureArray;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 582b8c1..1c59d29 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -19,13 +19,24 @@
 
 package org.apache.carbondata.processing.newflow.steps;
 
+import java.io.File;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.processing.constants.LoggerAction;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.newflow.converter.RowConverter;
 import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
 
 /**
  * Replace row data fields with dictionary values if column is configured dictionary encoded.
@@ -33,7 +44,7 @@ import org.apache.carbondata.processing.newflow.row.CarbonRow;
  */
 public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
-  private RowConverter encoder;
+  private RowConverter converter;
 
   public DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
       AbstractDataLoadProcessorStep child) {
@@ -47,20 +58,105 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
 
   @Override
   public void initialize() throws CarbonDataLoadingException {
-    encoder = new RowConverterImpl(child.getOutput(), configuration);
     child.initialize();
+    BadRecordsLogger badRecordLogger = createBadRecordLogger();
+    converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger);
+    converter.initialize();
+  }
+
+  /**
+   * Create the iterator using child iterator.
+   *
+   * @param childIter
+   * @return new iterator with step specific processing.
+   */
+  @Override
+  protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) {
+    return new CarbonIterator<CarbonRowBatch>() {
+      RowConverter localConverter = converter.createCopyForNewThread();
+      @Override public boolean hasNext() {
+        return childIter.hasNext();
+      }
+
+      @Override public CarbonRowBatch next() {
+        return processRowBatch(childIter.next(), localConverter);
+      }
+    };
+  }
+
+  /**
+   * Process the batch of rows as per the step logic.
+   *
+   * @param rowBatch
+   * @return processed row.
+   */
+  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) {
+    CarbonRowBatch newBatch = new CarbonRowBatch();
+    Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator();
+    while (batchIterator.hasNext()) {
+      newBatch.addRow(localConverter.convert(batchIterator.next()));
+    }
+    return newBatch;
   }
 
   @Override
   protected CarbonRow processRow(CarbonRow row) {
-    return encoder.convert(row);
+    throw new UnsupportedOperationException();
+  }
+
+  private BadRecordsLogger createBadRecordLogger() {
+    boolean badRecordsLogRedirect = false;
+    boolean badRecordConvertNullDisable = false;
+    boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+            .toString());
+    Object bad_records_action =
+        configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+            .toString();
+    if (null != bad_records_action) {
+      LoggerAction loggerAction = null;
+      try {
+        loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+      } catch (IllegalArgumentException e) {
+        loggerAction = LoggerAction.FORCE;
+      }
+      switch (loggerAction) {
+        case FORCE:
+          badRecordConvertNullDisable = false;
+          break;
+        case REDIRECT:
+          badRecordsLogRedirect = true;
+          badRecordConvertNullDisable = true;
+          break;
+        case IGNORE:
+          badRecordsLogRedirect = false;
+          badRecordConvertNullDisable = true;
+          break;
+      }
+    }
+    CarbonTableIdentifier identifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+    BadRecordsLogger badRecordsLogger = new BadRecordsLogger(identifier.getBadRecordLoggerKey(),
+        identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation(
+        identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator
+            + configuration.getTaskNo()), badRecordsLogRedirect, badRecordsLoggerEnable,
+        badRecordConvertNullDisable);
+    return badRecordsLogger;
+  }
+
+  private String getBadLogStoreLocation(String storeLocation) {
+    String badLogStoreLocation =
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC);
+    badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation;
+
+    return badLogStoreLocation;
   }
 
   @Override
   public void close() {
     super.close();
-    if (encoder != null) {
-      encoder.finish();
+    if (converter != null) {
+      converter.finish();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
new file mode 100644
index 0000000..48492d3
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.steps;
+
+import java.io.File;
+import java.util.Iterator;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.constants.IgnoreDictionary;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from sorted files which are generated in previous sort step.
+ * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
+ */
+public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataWriterProcessorStepImpl.class.getName());
+
+  private SegmentProperties segmentProperties;
+
+  private KeyGenerator keyGenerator;
+
+  private CarbonFactHandler dataHandler;
+
+  private int noDictionaryCount;
+
+  private int complexDimensionCount;
+
+  private int measureCount;
+
+  private long readCounter;
+
+  private long writeCounter;
+
+  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
+
+  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
+
+  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
+
+  private String storeLocation;
+
+  public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override
+  public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override
+  public void initialize() throws CarbonDataLoadingException {
+    child.initialize();
+    CarbonTableIdentifier tableIdentifier =
+        configuration.getTableIdentifier().getCarbonTableIdentifier();
+
+    storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
+            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()),
+            configuration.getPartitionId(), configuration.getSegmentId() + "", false);
+
+    if (!(new File(storeLocation).mkdirs())) {
+      LOGGER.error("Local data load folder location does not exist: " + storeLocation);
+      return;
+    }
+  }
+
+  @Override
+  public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+    Iterator<CarbonRowBatch>[] iterators = child.execute();
+    String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName();
+    try {
+      CarbonFactDataHandlerModel dataHandlerModel =
+          CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration, storeLocation);
+      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
+      complexDimensionCount = configuration.getComplexDimensionCount();
+      measureCount = dataHandlerModel.getMeasureCount();
+      segmentProperties = dataHandlerModel.getSegmentProperties();
+      keyGenerator = segmentProperties.getDimensionKeyGenerator();
+      dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel,
+          CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+      dataHandler.initialise();
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+              System.currentTimeMillis());
+      for (Iterator<CarbonRowBatch> iterator : iterators) {
+        while (iterator.hasNext()) {
+          processBatch(iterator.next());
+        }
+      }
+
+    } catch (CarbonDataWriterException e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
+      throw new CarbonDataLoadingException(
+          "Error while initializing data handler : " + e.getMessage());
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
+      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
+    }
+    return null;
+  }
+
+  @Override
+  public void close() {
+    String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName();
+    try {
+      dataHandler.finish();
+    } catch (Exception e) {
+      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
+    }
+    LOGGER.info("Record Processed For table: " + tableName);
+    String logMessage =
+        "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: "
+            + writeCounter;
+    LOGGER.info(logMessage);
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(writeCounter);
+    processingComplete();
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+            System.currentTimeMillis());
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+  }
+
+  private void processingComplete() throws CarbonDataLoadingException {
+    if (null != dataHandler) {
+      try {
+        dataHandler.closeHandler();
+      } catch (CarbonDataWriterException e) {
+        LOGGER.error(e, e.getMessage());
+        throw new CarbonDataLoadingException(e.getMessage());
+      } catch (Exception e) {
+        LOGGER.error(e, e.getMessage());
+        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
+      }
+    }
+  }
+
+  private void processBatch(CarbonRowBatch batch) throws CarbonDataLoadingException {
+    Iterator<CarbonRow> iterator = batch.getBatchIterator();
+    try {
+      while (iterator.hasNext()) {
+        CarbonRow row = iterator.next();
+        readCounter++;
+        Object[] outputRow;
+        // adding one for the high cardinality dims byte array.
+        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
+          outputRow = new Object[measureCount + 1 + 1];
+        } else {
+          outputRow = new Object[measureCount + 1];
+        }
+
+        int l = 0;
+        int index = 0;
+        Object[] measures = row.getObjectArray(measureIndex);
+        for (int i = 0; i < measureCount; i++) {
+          outputRow[l++] = measures[index++];
+        }
+        outputRow[l] = row.getObject(noDimByteArrayIndex);
+
+        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
+        int[] dimsArray = row.getIntArray(dimsArrayIndex);
+        for (int i = 0; i < highCardExcludedRows.length; i++) {
+          highCardExcludedRows[i] = dimsArray[i];
+        }
+
+        outputRow[outputRow.length - 1] = keyGenerator.generateKey(highCardExcludedRows);
+        dataHandler.addDataToStore(outputRow);
+        writeCounter++;
+      }
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException("unable to generate the mdkey", e);
+    }
+  }
+
+  @Override
+  protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
new file mode 100644
index 0000000..044665c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
@@ -0,0 +1,84 @@
+package org.apache.carbondata.processing.newflow.steps;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
+
+/**
+ * DummyClassForTest
+ */
+public class DummyClassForTest extends AbstractDataLoadProcessorStep {
+
+  private ExecutorService executorService;
+
+  public DummyClassForTest(CarbonDataLoadConfiguration configuration,
+      AbstractDataLoadProcessorStep child) {
+    super(configuration, child);
+  }
+
+  @Override public DataField[] getOutput() {
+    return child.getOutput();
+  }
+
+  @Override public void initialize() throws CarbonDataLoadingException {
+
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
+    Iterator<CarbonRowBatch>[] iterators = child.execute();
+    this.executorService = Executors.newFixedThreadPool(iterators.length);
+
+    try {
+      for (int i = 0; i < iterators.length; i++) {
+        executorService.submit(new DummyThread(iterators[i]));
+      }
+      executorService.shutdown();
+      executorService.awaitTermination(2, TimeUnit.DAYS);
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException("Problem while shutdown the server ", e);
+    }
+    return null;
+  }
+
+  @Override protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+}
+
+/**
+ * This thread iterates the iterator
+ */
+class DummyThread implements Callable<Void> {
+
+  private Iterator<CarbonRowBatch> iterator;
+
+  public DummyThread(Iterator<CarbonRowBatch> iterator) {
+    this.iterator = iterator;
+  }
+
+  @Override public Void call() throws CarbonDataLoadingException {
+    try {
+      while (iterator.hasNext()) {
+        CarbonRowBatch batch = iterator.next();
+        Iterator<CarbonRow> batchIterator = batch.getBatchIterator();
+        while (batchIterator.hasNext()) {
+          CarbonRow row = batchIterator.next();
+          // do nothing
+        }
+      }
+
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
index 02b7fee..69bd84a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
@@ -11,10 +11,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
-import org.apache.carbondata.processing.newflow.parser.GenericParser;
 import org.apache.carbondata.processing.newflow.parser.RowParser;
 import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
@@ -28,42 +25,24 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(InputProcessorStepImpl.class.getName());
 
-  private GenericParser[] genericParsers;
+  private RowParser rowParser;
 
-  private List<Iterator<Object[]>> inputIterators;
+  private Iterator<Object[]>[] inputIterators;
 
   public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
-      AbstractDataLoadProcessorStep child, List<Iterator<Object[]>> inputIterators) {
-    super(configuration, child);
+      Iterator<Object[]>[] inputIterators) {
+    super(configuration, null);
     this.inputIterators = inputIterators;
   }
 
   @Override
   public DataField[] getOutput() {
-    DataField[] fields = configuration.getDataFields();
-    String[] header = configuration.getHeader();
-    DataField[] output = new DataField[fields.length];
-    int k = 0;
-    for (int i = 0; i < header.length; i++) {
-      for (int j = 0; j < fields.length; j++) {
-        if (header[j].equalsIgnoreCase(fields[j].getColumn().getColName())) {
-          output[k++] = fields[j];
-          break;
-        }
-      }
-    }
-    return output;
+    return configuration.getDataFields();
   }
 
   @Override
   public void initialize() throws CarbonDataLoadingException {
-    DataField[] output = getOutput();
-    genericParsers = new GenericParser[output.length];
-    for (int i = 0; i < genericParsers.length; i++) {
-      genericParsers[i] = CarbonParserFactory.createParser(output[i].getColumn(),
-          (String[]) configuration
-              .getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS));
-    }
+    rowParser = new RowParserImpl(getOutput(), configuration);
   }
 
 
@@ -74,7 +53,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
     List<Iterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
     Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
     for (int i = 0; i < outIterators.length; i++) {
-      outIterators[i] = new InputProcessorIterator(readerIterators[i], genericParsers, batchSize);
+      outIterators[i] = new InputProcessorIterator(readerIterators[i], rowParser, batchSize);
     }
     return outIterators;
   }
@@ -88,15 +67,15 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
     int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
     // Get the minimum of number of cores and iterators size to get the number of parallel threads
     // to be launched.
-    int parallelThreadNumber = Math.min(inputIterators.size(), numberOfCores);
+    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
 
     List<Iterator<Object[]>>[] iterators = new List[parallelThreadNumber];
     for (int i = 0; i < parallelThreadNumber; i++) {
       iterators[i] = new ArrayList<>();
     }
     // Equally partition the iterators as per number of threads
-    for (int i = 0; i < inputIterators.size(); i++) {
-      iterators[i % parallelThreadNumber].add(inputIterators.get(i));
+    for (int i = 0; i < inputIterators.length; i++) {
+      iterators[i % parallelThreadNumber].add(inputIterators[i]);
     }
     return iterators;
   }
@@ -123,10 +102,10 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
     private RowParser rowParser;
 
     public InputProcessorIterator(List<Iterator<Object[]>> inputIterators,
-        GenericParser[] genericParsers, int batchSize) {
+        RowParser rowParser, int batchSize) {
       this.inputIterators = inputIterators;
       this.batchSize = batchSize;
-      this.rowParser = new RowParserImpl(genericParsers);
+      this.rowParser = rowParser;
       this.counter = 0;
       // Get the first iterator from the list.
       currentIterator = inputIterators.get(counter++);
@@ -145,8 +124,8 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
         if (counter < inputIterators.size()) {
           // Get the next iterator from the list.
           currentIterator = inputIterators.get(counter++);
+          hasNext = internalHasNext();
         }
-        hasNext = internalHasNext();
       }
       return hasNext;
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
index aae6dae..19d099b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
@@ -50,6 +50,7 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
 
   @Override
   public void initialize() throws CarbonDataLoadingException {
+    child.initialize();
     SortParameters sortParameters = SortParameters.createSortParameters(configuration);
     sorter = new ParallelReadMergeSorterImpl(child.getOutput());
     sorter.initialize(sortParameters);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/writer/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/writer/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/writer/DataWriterProcessorStepImpl.java
deleted file mode 100644
index 51f5ec4..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/writer/DataWriterProcessorStepImpl.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.processing.newflow.steps.writer;
-
-import java.io.File;
-import java.util.Iterator;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
-import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.constants.IgnoreDictionary;
-import org.apache.carbondata.core.keygenerator.KeyGenerator;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It reads data from sorted files which are generated in previous sort step.
- * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
- */
-public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataWriterProcessorStepImpl.class.getName());
-
-  private String storeLocation;
-
-
-  private SegmentProperties segmentProperties;
-
-  private KeyGenerator keyGenerator;
-
-  private CarbonFactHandler dataHandler;
-
-  private int noDictionaryCount;
-
-  private int complexDimensionCount;
-
-  private int measureCount;
-
-  private long readCounter;
-
-  private long writeCounter;
-
-  private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
-
-  private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
-
-  private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
-
-  public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
-      AbstractDataLoadProcessorStep child) {
-    super(configuration, child);
-  }
-
-  @Override
-  public DataField[] getOutput() {
-    return child.getOutput();
-  }
-
-  @Override
-  public void initialize() throws CarbonDataLoadingException {
-    CarbonTableIdentifier tableIdentifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-
-    String storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()),
-            configuration.getPartitionId(), configuration.getSegmentId() + "", false);
-
-    if (!(new File(storeLocation).exists())) {
-      LOGGER.error("Local data load folder location does not exist: " + storeLocation);
-      return;
-    }
-
-  }
-
-  @Override
-  public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
-    Iterator<CarbonRowBatch>[] iterators = child.execute();
-    String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName();
-    try {
-      CarbonFactDataHandlerModel dataHandlerModel =
-          CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration);
-      noDictionaryCount = dataHandlerModel.getNoDictionaryCount();
-      complexDimensionCount = configuration.getComplexDimensionCount();
-      measureCount = dataHandlerModel.getMeasureCount();
-      segmentProperties = dataHandlerModel.getSegmentProperties();
-      keyGenerator = segmentProperties.getDimensionKeyGenerator();
-      dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel,
-          CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
-      dataHandler.initialise();
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
-              System.currentTimeMillis());
-      for (int i = 0; i < iterators.length; i++) {
-        Iterator<CarbonRowBatch> iterator = iterators[i];
-        while (iterator.hasNext()) {
-          processBatch(iterator.next());
-        }
-      }
-
-    } catch (CarbonDataWriterException e) {
-      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
-      throw new CarbonDataLoadingException(
-          "Error while initializing data handler : " + e.getMessage());
-    } catch (Exception e) {
-      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
-      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
-    } finally {
-      try {
-        dataHandler.finish();
-      } catch (CarbonDataWriterException e) {
-        LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
-      } catch (Exception e) {
-        LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
-      }
-    }
-    LOGGER.info("Record Procerssed For table: " + tableName);
-    String logMessage =
-        "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: "
-            + writeCounter;
-    LOGGER.info(logMessage);
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(writeCounter);
-    processingComplete();
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
-            System.currentTimeMillis());
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
-
-    return null;
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  private void processingComplete() throws CarbonDataLoadingException {
-    if (null != dataHandler) {
-      try {
-        dataHandler.closeHandler();
-      } catch (CarbonDataWriterException e) {
-        LOGGER.error(e, e.getMessage());
-        throw new CarbonDataLoadingException(e.getMessage());
-      } catch (Exception e) {
-        LOGGER.error(e, e.getMessage());
-        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
-      }
-    }
-  }
-
-  private void processBatch(CarbonRowBatch batch) throws CarbonDataLoadingException {
-    Iterator<CarbonRow> iterator = batch.getBatchIterator();
-    try {
-      while (iterator.hasNext()) {
-        CarbonRow row = iterator.next();
-        readCounter++;
-        Object[] outputRow = null;
-        // adding one for the high cardinality dims byte array.
-        if (noDictionaryCount > 0 || complexDimensionCount > 0) {
-          outputRow = new Object[measureCount + 1 + 1];
-        } else {
-          outputRow = new Object[measureCount + 1];
-        }
-
-        int l = 0;
-        int index = 0;
-        Object[] measures = row.getObjectArray(measureIndex);
-        for (int i = 0; i < measureCount; i++) {
-          outputRow[l++] = measures[index++];
-        }
-        outputRow[l] = row.getBinary(noDimByteArrayIndex);
-
-        int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
-        Integer[] dimsArray = row.getIntegerArray(dimsArrayIndex);
-        for (int i = 0; i < highCardExcludedRows.length; i++) {
-          highCardExcludedRows[i] = dimsArray[i];
-        }
-
-        outputRow[outputRow.length - 1] = keyGenerator.generateKey(highCardExcludedRows);
-        dataHandler.addDataToStore(outputRow);
-        writeCounter++;
-      }
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException("unable to generate the mdkey", e);
-    }
-  }
-
-  @Override
-  protected CarbonRow processRow(CarbonRow row) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index cffc00b..f1b062e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -90,6 +90,10 @@ public class IntermediateFileMerger implements Callable<Void> {
 
   private File outPutFile;
 
+  private boolean useKettle;
+
+  private boolean[] noDictionarycolumnMapping;
+
   /**
    * IntermediateFileMerger Constructor
    */
@@ -99,6 +103,8 @@ public class IntermediateFileMerger implements Callable<Void> {
     this.fileCounter = intermediateFiles.length;
     this.intermediateFiles = intermediateFiles;
     this.outPutFile = outPutFile;
+    this.useKettle = mergerParameters.isUseKettle();
+    noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
   }
 
   @Override public Void call() throws Exception {
@@ -108,9 +114,14 @@ public class IntermediateFileMerger implements Callable<Void> {
     try {
       startSorting();
       initialize();
-
-      while (hasNext()) {
-        writeDataTofile(next());
+      if (useKettle) {
+        while (hasNext()) {
+          writeDataTofile(next());
+        }
+      } else {
+        while (hasNext()) {
+          writeDataTofileWithOutKettle(next());
+        }
       }
       if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
         if (entryCount > 0) {
@@ -252,7 +263,8 @@ public class IntermediateFileMerger implements Callable<Void> {
           new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
               mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
               mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
-              mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn());
+              mergerParameters.getAggType(), mergerParameters.getNoDictionaryDimnesionColumn(),
+              mergerParameters.isUseKettle());
 
       // initialize
       sortTempFileChunkHolder.initialize();
@@ -300,6 +312,8 @@ public class IntermediateFileMerger implements Callable<Void> {
   /**
    * Below method will be used to write data to file
    *
+   * TODO Remove it after kettle is removed
+   *
    * @throws CarbonSortKeyAndGroupByException problem while writing
    */
   private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException {
@@ -362,6 +376,74 @@ public class IntermediateFileMerger implements Callable<Void> {
     }
   }
 
+  /**
+   * Below method will be used to write data to file
+   *
+   * @throws CarbonSortKeyAndGroupByException problem while writing
+   */
+  private void writeDataTofileWithOutKettle(Object[] row) throws CarbonSortKeyAndGroupByException {
+    if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
+      if (entryCount == 0) {
+        records = new Object[totalSize][];
+        records[entryCount++] = row;
+        return;
+      }
+
+      records[entryCount++] = row;
+      if (entryCount == totalSize) {
+        this.writer.writeSortTempFile(records);
+        entryCount = 0;
+        records = new Object[totalSize][];
+      }
+      return;
+    }
+    try {
+      char[] aggType = mergerParameters.getAggType();
+      int[] mdkArray = (int[]) row[0];
+      byte[][] nonDictArray = (byte[][]) row[1];
+      int mdkIndex = 0;
+      int nonDictKeyIndex = 0;
+      // write dictionary and non dictionary dimensions here.
+      for (boolean nodictinary : noDictionarycolumnMapping) {
+        if (nodictinary) {
+          byte[] col = nonDictArray[nonDictKeyIndex++];
+          stream.writeShort(col.length);
+          stream.write(col);
+        } else {
+          stream.writeInt(mdkArray[mdkIndex++]);
+        }
+      }
+
+      int fieldIndex = 0;
+      for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
+        if (null != RemoveDictionaryUtil.getMeasure(fieldIndex, row)) {
+          stream.write((byte) 1);
+          if (aggType[counter] == CarbonCommonConstants.BYTE_VALUE_MEASURE) {
+            Double val = (Double) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
+            stream.writeDouble(val);
+          } else if (aggType[counter] == CarbonCommonConstants.SUM_COUNT_VALUE_MEASURE) {
+            Double val = (Double) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
+            stream.writeDouble(val);
+          } else if (aggType[counter] == CarbonCommonConstants.BIG_INT_MEASURE) {
+            Long val = (Long) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
+            stream.writeLong(val);
+          } else if (aggType[counter] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
+            byte[] bigDecimalInBytes = (byte[]) RemoveDictionaryUtil.getMeasure(fieldIndex, row);
+            stream.writeInt(bigDecimalInBytes.length);
+            stream.write(bigDecimalInBytes);
+          }
+        } else {
+          stream.write((byte) 0);
+        }
+
+        fieldIndex++;
+      }
+
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+    }
+  }
+
   private void finish() throws CarbonSortKeyAndGroupByException {
     if (recordHolderHeap != null) {
       int size = recordHolderHeap.size();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
new file mode 100644
index 0000000..ed9c018
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.sortandgroupby.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+public class NewRowComparator implements Comparator<Object[]> {
+
+  /**
+   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
+   */
+  private boolean[] noDictionaryColMaping;
+
+  /**
+   * @param noDictionaryColMaping
+   */
+  public NewRowComparator(boolean[] noDictionaryColMaping) {
+    this.noDictionaryColMaping = noDictionaryColMaping;
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(Object[] rowA, Object[] rowB) {
+    int diff = 0;
+
+    int index = 0;
+
+    for (boolean isNoDictionary : noDictionaryColMaping) {
+
+      if (isNoDictionary) {
+        byte[] byteArr1 = (byte[]) rowA[index];
+
+        byte[] byteArr2 = (byte[]) rowB[index];
+
+        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+        if (difference != 0) {
+          return difference;
+        }
+      } else {
+        int dimFieldA = (int) rowA[index];
+        int dimFieldB = (int) rowB[index];
+        diff = dimFieldA - dimFieldB;
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      index++;
+    }
+
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/496cde46/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
new file mode 100644
index 0000000..80aa790
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/NewRowComparatorForNormalDims.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.sortandgroupby.sortdata;
+
+import java.util.Comparator;
+
+/**
+ * This class is used as comparator for comparing dims which are non high cardinality dims.
+ * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
+ */
+public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
+  /**
+   * dimension count
+   */
+  private int dimensionCount;
+
+  /**
+   * RowComparatorForNormalDims Constructor
+   *
+   * @param dimensionCount
+   */
+  public NewRowComparatorForNormalDims(int dimensionCount) {
+    this.dimensionCount = dimensionCount;
+  }
+
+  /**
+   * Below method will be used to compare two surrogate keys
+   *
+   * @see Comparator#compare(Object, Object)
+   */
+  public int compare(Object[] rowA, Object[] rowB) {
+    int diff = 0;
+
+    for (int i = 0; i < dimensionCount; i++) {
+
+      int dimFieldA = (int)rowA[i];
+      int dimFieldB = (int)rowB[i];
+      diff = dimFieldA - dimFieldB;
+      if (diff != 0) {
+        return diff;
+      }
+    }
+    return diff;
+  }
+}