You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/14 23:26:51 UTC

[2/3] incubator-carbondata git commit: csvReader code improvements

csvReader code improvements


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d6ceb1d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d6ceb1d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d6ceb1d3

Branch: refs/heads/master
Commit: d6ceb1d3dd5110a85e861b5b5b2dd1021d1714ef
Parents: 25de27f
Author: Jihong Ma <ji...@apache.org>
Authored: Mon Jan 30 21:41:18 2017 -0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Feb 14 15:08:20 2017 -0800

----------------------------------------------------------------------
 .../carbondata/hadoop/CarbonInputFormat.java    |    6 +-
 .../carbondata/hadoop/csv/CSVInputFormat.java   |  288 -----
 .../recorditerator/RecordReaderIterator.java    |   98 --
 .../hadoop/io/BoundedInputStream.java           |  129 ---
 .../hadoop/io/StringArrayWritable.java          |   70 --
 .../hadoop/readsupport/CarbonReadSupport.java   |   15 +-
 .../AbstractDictionaryDecodedReadSupport.java   |   84 --
 .../impl/ArrayWritableReadSupport.java          |   48 -
 .../impl/DictionaryDecodeReadSupport.java       |   94 ++
 .../impl/DictionaryDecodedReadSupportImpl.java  |   34 -
 .../readsupport/impl/RawDataReadSupport.java    |   17 +-
 .../hadoop/csv/CSVInputFormatTest.java          |  169 ---
 hadoop/src/test/resources/data.csv.bz2          |  Bin 10572 -> 0 bytes
 hadoop/src/test/resources/data.csv.gz           |  Bin 14710 -> 0 bytes
 hadoop/src/test/resources/data.csv.lz4          |  Bin 24495 -> 0 bytes
 hadoop/src/test/resources/data.csv.snappy       |  Bin 24263 -> 0 bytes
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |    8 +-
 .../carbondata/spark/util/CommonUtil.scala      |    2 +-
 .../spark/util/GlobalDictionaryUtil.scala       |    4 +-
 .../readsupport/SparkRowReadSupportImpl.java    |    4 +-
 .../readsupport/SparkRowReadSupportImpl.java    |    4 +-
 .../processing/csvload/BoundedInputStream.java  |  129 +++
 .../processing/csvload/CSVInputFormat.java      |  285 +++++
 .../csvload/CSVRecordReaderIterator.java        |   97 ++
 .../processing/csvload/StringArrayWritable.java |   70 ++
 .../processing/csvload/CSVInputFormatTest.java  |  167 +++
 processing/src/test/resources/csv/data.csv      | 1001 ++++++++++++++++++
 processing/src/test/resources/csv/data.csv.bz2  |  Bin 0 -> 10572 bytes
 processing/src/test/resources/csv/data.csv.gz   |  Bin 0 -> 14710 bytes
 processing/src/test/resources/csv/data.csv.lz4  |  Bin 0 -> 24495 bytes
 .../src/test/resources/csv/data.csv.snappy      |  Bin 0 -> 24263 bytes
 31 files changed, 1870 insertions(+), 953 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 4f81438..8187089 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -60,7 +60,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
 import org.apache.carbondata.hadoop.util.BlockLevelTraverser;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
@@ -641,7 +641,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
     String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
     //By default it uses dictionary decoder read class
-    CarbonReadSupport readSupport = null;
+    CarbonReadSupport<T> readSupport = null;
     if (readSupportClass != null) {
       try {
         Class<?> myClass = Class.forName(readSupportClass);
@@ -656,7 +656,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
         LOG.error("Error while creating " + readSupportClass, ex);
       }
     } else {
-      readSupport = new DictionaryDecodedReadSupportImpl();
+      readSupport = new DictionaryDecodeReadSupport<>();
     }
     return readSupport;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
deleted file mode 100644
index 9e35d13..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.csv;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-
-import org.apache.carbondata.hadoop.io.BoundedInputStream;
-import org.apache.carbondata.hadoop.io.StringArrayWritable;
-
-import com.univocity.parsers.csv.CsvParser;
-import com.univocity.parsers.csv.CsvParserSettings;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.util.LineReader;
-
-/**
- * An {@link org.apache.hadoop.mapreduce.InputFormat} for csv files.  Files are broken into lines.
- * Values are the line of csv files.
- */
-public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWritable> {
-
-  public static final String DELIMITER = "carbon.csvinputformat.delimiter";
-  public static final String DELIMITER_DEFAULT = ",";
-  public static final String COMMENT = "carbon.csvinputformat.comment";
-  public static final String COMMENT_DEFAULT = "#";
-  public static final String QUOTE = "carbon.csvinputformat.quote";
-  public static final String QUOTE_DEFAULT = "\"";
-  public static final String ESCAPE = "carbon.csvinputformat.escape";
-  public static final String ESCAPE_DEFAULT = "\\";
-  public static final String HEADER_PRESENT = "caron.csvinputformat.header.present";
-  public static final boolean HEADER_PRESENT_DEFAULT = false;
-  public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size";
-  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
-
-  @Override
-  public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit inputSplit,
-      TaskAttemptContext context) throws IOException, InterruptedException {
-    return new CSVRecordReader();
-  }
-
-  @Override
-  protected boolean isSplitable(JobContext context, Path file) {
-    final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration())
-        .getCodec(file);
-    if (null == codec) {
-      return true;
-    }
-    return codec instanceof SplittableCompressionCodec;
-  }
-
-  /**
-   * Sets the comment char to configuration. Default it is #.
-   * @param configuration
-   * @param commentChar
-   */
-  public static void setCommentCharacter(Configuration configuration, String commentChar) {
-    if (commentChar != null && !commentChar.isEmpty()) {
-      configuration.set(COMMENT, commentChar);
-    }
-  }
-
-  /**
-   * Sets the delimiter to configuration. Default it is ','
-   * @param configuration
-   * @param delimiter
-   */
-  public static void setCSVDelimiter(Configuration configuration, String delimiter) {
-    if (delimiter != null && !delimiter.isEmpty()) {
-      configuration.set(DELIMITER, delimiter);
-    }
-  }
-
-  /**
-   * Sets the escape character to configuration. Default it is \
-   * @param configuration
-   * @param escapeCharacter
-   */
-  public static void setEscapeCharacter(Configuration configuration, String escapeCharacter) {
-    if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
-      configuration.set(ESCAPE, escapeCharacter);
-    }
-  }
-
-  /**
-   * Whether header needs to read from csv or not. By default it is false.
-   * @param configuration
-   * @param headerExtractEnable
-   */
-  public static void setHeaderExtractionEnabled(Configuration configuration,
-      boolean headerExtractEnable) {
-    configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
-  }
-
-  /**
-   * Sets the quote character to configuration. Default it is "
-   * @param configuration
-   * @param quoteCharacter
-   */
-  public static void setQuoteCharacter(Configuration configuration, String quoteCharacter) {
-    if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
-      configuration.set(QUOTE, quoteCharacter);
-    }
-  }
-
-  /**
-   * Sets the read buffer size to configuration.
-   * @param configuration
-   * @param bufferSize
-   */
-  public static void setReadBufferSize(Configuration configuration, String bufferSize) {
-    if (bufferSize != null && !bufferSize.isEmpty()) {
-      configuration.set(READ_BUFFER_SIZE, bufferSize);
-    }
-  }
-
-  /**
-   * Treats value as line in file. Key is null.
-   */
-  public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> {
-
-    private long start;
-    private long end;
-    private BoundedInputStream boundedInputStream;
-    private Reader reader;
-    private CsvParser csvParser;
-    private StringArrayWritable value;
-    private String[] columns;
-    private Seekable filePosition;
-    private boolean isCompressedInput;
-    private Decompressor decompressor;
-
-    @Override
-    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
-        throws IOException, InterruptedException {
-      FileSplit split = (FileSplit) inputSplit;
-      start = split.getStart();
-      end = start + split.getLength();
-      Path file = split.getPath();
-      Configuration job = context.getConfiguration();
-      CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
-      FileSystem fs = file.getFileSystem(job);
-      int bufferSize = Integer.parseInt(job.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT));
-      FSDataInputStream fileIn = fs.open(file, bufferSize);
-      InputStream inputStream;
-      if (codec != null) {
-        isCompressedInput = true;
-        decompressor = CodecPool.getDecompressor(codec);
-        if (codec instanceof SplittableCompressionCodec) {
-          SplitCompressionInputStream scIn = ((SplittableCompressionCodec) codec)
-              .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec
-                  .READ_MODE.BYBLOCK);
-          start = scIn.getAdjustedStart();
-          end = scIn.getAdjustedEnd();
-          if (start != 0) {
-            LineReader lineReader = new LineReader(scIn, 1);
-            start += lineReader.readLine(new Text(), 0);
-          }
-          filePosition = scIn;
-          inputStream = scIn;
-        } else {
-          CompressionInputStream cIn = codec.createInputStream(fileIn, decompressor);
-          filePosition = cIn;
-          inputStream = cIn;
-        }
-      } else {
-        fileIn.seek(start);
-        if (start != 0) {
-          LineReader lineReader = new LineReader(fileIn, 1);
-          start += lineReader.readLine(new Text(), 0);
-        }
-        boundedInputStream = new BoundedInputStream(fileIn, end - start);
-        filePosition = fileIn;
-        inputStream = boundedInputStream;
-      }
-      reader = new InputStreamReader(inputStream);
-      csvParser = new CsvParser(extractCsvParserSettings(job));
-      csvParser.beginParsing(reader);
-    }
-
-    private CsvParserSettings extractCsvParserSettings(Configuration job) {
-      CsvParserSettings parserSettings = new CsvParserSettings();
-      parserSettings.getFormat().setDelimiter(job.get(DELIMITER, DELIMITER_DEFAULT).charAt(0));
-      parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
-      parserSettings.setLineSeparatorDetectionEnabled(true);
-      parserSettings.setNullValue("");
-      parserSettings.setIgnoreLeadingWhitespaces(false);
-      parserSettings.setIgnoreTrailingWhitespaces(false);
-      parserSettings.setSkipEmptyLines(false);
-      // TODO get from csv file.
-      parserSettings.setMaxColumns(1000);
-      parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
-      parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
-      if (start == 0) {
-        parserSettings.setHeaderExtractionEnabled(job.getBoolean(HEADER_PRESENT,
-            HEADER_PRESENT_DEFAULT));
-      }
-      return parserSettings;
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      columns = csvParser.parseNext();
-      if (columns == null) {
-        value = null;
-        return false;
-      }
-      if (value == null) {
-        value = new StringArrayWritable();
-      }
-      value.set(columns);
-      return true;
-    }
-
-    @Override
-    public NullWritable getCurrentKey() throws IOException, InterruptedException {
-      return NullWritable.get();
-    }
-
-    @Override
-    public StringArrayWritable getCurrentValue() throws IOException, InterruptedException {
-      return value;
-    }
-
-    private long getPos() throws IOException {
-      long retVal = start;
-      if (null != boundedInputStream) {
-        retVal = end - boundedInputStream.getRemaining();
-      } else if (isCompressedInput && null != filePosition) {
-        retVal = filePosition.getPos();
-      }
-      return retVal;
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return start == end ? 0.0F : Math.min(1.0F, (float) (getPos() -
-          start) / (float) (end - start));
-    }
-
-    @Override
-    public void close() throws IOException {
-      try {
-        if (reader != null) {
-          reader.close();
-        }
-      } finally {
-        if (decompressor != null) {
-          CodecPool.returnDecompressor(decompressor);
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
deleted file mode 100644
index 39dd916..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.csv.recorditerator;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.hadoop.io.StringArrayWritable;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * It is wrapper iterator around @{@link RecordReader}.
- */
-public class RecordReaderIterator extends CarbonIterator<Object []> {
-
-  private RecordReader<NullWritable, StringArrayWritable> recordReader;
-
-  /**
-   * It is just a little hack to make recordreader as iterator. Usually we cannot call hasNext
-   * multiple times on record reader as it moves another line. To avoid that situation like hasNext
-   * only tells whether next row is present or not and next will move the pointer to next row after
-   * consuming it.
-   */
-  private boolean isConsumed;
-
-  private InputSplit split;
-
-  private TaskAttemptContext context;
-
-  public RecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader,
-      InputSplit split, TaskAttemptContext context) {
-    this.recordReader = recordReader;
-    this.split = split;
-    this.context = context;
-  }
-
-  @Override
-  public boolean hasNext() {
-    try {
-      if (!isConsumed) {
-        isConsumed = recordReader.nextKeyValue();
-        return isConsumed;
-      }
-      return true;
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  @Override
-  public Object[] next() {
-    try {
-      String[] data = recordReader.getCurrentValue().get();
-      isConsumed = false;
-      return data;
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-  }
-
-  @Override
-  public void initialize() {
-    try {
-      recordReader.initialize(split, context);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  public void close() {
-    try {
-      recordReader.close();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/io/BoundedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/io/BoundedInputStream.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/io/BoundedInputStream.java
deleted file mode 100644
index 4451400..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/io/BoundedInputStream.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.io;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Custom reader class to read the data from file it will take care of reading
- * till the limit assigned to this class
- */
-public class BoundedInputStream extends InputStream {
-
-  /**
-   * byte value of the new line character
-   */
-  private static final byte END_OF_LINE_BYTE_VALUE = '\n';
-
-  /**
-   * number of extra character to read
-   */
-  private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
-
-  /**
-   * number of bytes remaining
-   */
-  private long remaining;
-  /**
-   * to check whether end of line is found
-   */
-  private boolean endOfLineFound = false;
-
-  private DataInputStream in;
-
-  public BoundedInputStream(DataInputStream in, long limit) {
-    this.in = in;
-    this.remaining = limit;
-  }
-
-  /**
-   * Below method will be used to read the data from file
-   *
-   * @throws IOException
-   *           problem while reading
-   */
-  @Override
-  public int read() throws IOException {
-    if (this.remaining == 0) {
-      return -1;
-    } else {
-      int var1 = this.in.read();
-      if (var1 >= 0) {
-        --this.remaining;
-      }
-
-      return var1;
-    }
-  }
-
-  /**
-   * Below method will be used to read the data from file. If limit reaches in
-   * that case it will read until new line character is reached
-   *
-   * @param buffer
-   *          buffer in which data will be read
-   * @param offset
-   *          from position to buffer will be filled
-   * @param length
-   *          number of character to be read
-   * @throws IOException
-   *           problem while reading
-   */
-  @Override
-  public int read(byte[] buffer, int offset, int length) throws IOException {
-    if (this.remaining == 0) {
-      return -1;
-    } else {
-      if (this.remaining < length) {
-        length = (int) this.remaining;
-      }
-
-      length = this.in.read(buffer, offset, length);
-      if (length >= 0) {
-        this.remaining -= length;
-        if (this.remaining == 0 && !endOfLineFound) {
-          endOfLineFound = true;
-          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
-        } else if (endOfLineFound) {
-          int end = offset + length;
-          for (int i = offset; i < end; i++) {
-            if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
-              this.remaining = 0;
-              return (i - offset) + 1;
-            }
-          }
-          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
-        }
-      }
-      return length;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (in != null) {
-      in.close();
-    }
-  }
-
-  public long getRemaining() {
-    return  this.remaining;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/io/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/io/StringArrayWritable.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/io/StringArrayWritable.java
deleted file mode 100644
index 6f5ae43..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/io/StringArrayWritable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * A String sequence that is usable as a key or value.
- */
-public class StringArrayWritable implements Writable {
-  private String[] values;
-
-  public String[] toStrings() {
-    return values;
-  }
-
-  public void set(String[] values) {
-    this.values = values;
-  }
-
-  public String[] get() {
-    return values;
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int length = in.readInt();
-    values = new String[length];
-    for (int i = 0; i < length; i++) {
-      byte[] b = new byte[in.readInt()];
-      in.readFully(b);
-      values[i] = new String(b, Charset.defaultCharset());
-    }
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(values.length);                 // write values
-    for (int i = 0; i < values.length; i++) {
-      byte[] b = values[i].getBytes(Charset.defaultCharset());
-      out.writeInt(b.length);
-      out.write(b);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return Arrays.toString(values);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
index 0ee23c9..b535aea 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
@@ -22,24 +22,27 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 
 /**
- * It converts to the desired class while reading the rows from RecordReader
+ * This is the interface to convert data reading from RecordReader to row representation.
  */
 public interface CarbonReadSupport<T> {
 
   /**
-   * It can use [{@link CarbonColumn}] array to create its own schema to create its row.
+   * Initialization if needed based on the projected column list
    *
-   * @param carbonColumns
+   * @param carbonColumns column list
+   * @param absoluteTableIdentifier table identifier
    */
   void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException;
 
+  /**
+   * convert column data back to row representation
+   * @param data column data
+   */
   T readRow(Object[] data);
 
   /**
-   * This method will be used to clear the dictionary cache and update access count for each
-   * column involved which will be used during eviction of columns from LRU cache if memory
-   * reaches threshold
+   * cleanup step if necessary
    */
   void close();
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
deleted file mode 100644
index 7723737..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.readsupport.impl;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-
-/**
- * Its an abstract class provides necessary information to decode dictionary data
- */
-public abstract class AbstractDictionaryDecodedReadSupport<T> implements CarbonReadSupport<T> {
-
-  protected Dictionary[] dictionaries;
-
-  protected DataType[] dataTypes;
-  /**
-   * carbon columns
-   */
-  protected CarbonColumn[] carbonColumns;
-
-  /**
-   * It would be instantiated in side the task so the dictionary would be loaded inside every mapper
-   * instead of driver.
-   *
-   * @param carbonColumns
-   * @param absoluteTableIdentifier
-   */
-  @Override public void initialize(CarbonColumn[] carbonColumns,
-      AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
-    this.carbonColumns = carbonColumns;
-    dictionaries = new Dictionary[carbonColumns.length];
-    dataTypes = new DataType[carbonColumns.length];
-    for (int i = 0; i < carbonColumns.length; i++) {
-      if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i]
-          .hasEncoding(Encoding.DIRECT_DICTIONARY)) {
-        CacheProvider cacheProvider = CacheProvider.getInstance();
-        Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
-            .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
-        dataTypes[i] = carbonColumns[i].getDataType();
-        dictionaries[i] = forwardDictionaryCache.get(new DictionaryColumnUniqueIdentifier(
-            absoluteTableIdentifier.getCarbonTableIdentifier(),
-            carbonColumns[i].getColumnIdentifier(), dataTypes[i]));
-      } else {
-        dataTypes[i] = carbonColumns[i].getDataType();
-      }
-    }
-  }
-
-  /**
-   * This method iwll be used to clear the dictionary cache and update access count for each
-   * column involved which will be used during eviction of columns from LRU cache if memory
-   * reaches threshold
-   */
-  @Override public void close() {
-    for (int i = 0; i < dictionaries.length; i++) {
-      CarbonUtil.clearDictionaryCache(dictionaries[i]);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
deleted file mode 100644
index 50272e5..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.readsupport.impl;
-
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-
-import org.apache.hadoop.io.ArrayWritable;
-
-public class ArrayWritableReadSupport implements CarbonReadSupport<ArrayWritable> {
-
-  @Override public void initialize(CarbonColumn[] carbonColumns,
-      AbsoluteTableIdentifier absoluteTableIdentifier) {
-  }
-
-  @Override public ArrayWritable readRow(Object[] data) {
-
-    String[] writables = new String[data.length];
-    for (int i = 0; i < data.length; i++) {
-      writables[i] = data[i].toString();
-    }
-    return new ArrayWritable(writables);
-  }
-
-  /**
-   * This method iwll be used to clear the dictionary cache and update access count for each
-   * column involved which will be used during eviction of columns from LRU cache if memory
-   * reaches threshold
-   */
-  @Override public void close() {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
new file mode 100644
index 0000000..43953d0
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.readsupport.impl;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+/**
+ *  This is the class to decode dictionary encoded column data back to its original value.
+ */
+public class DictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> {
+
+  protected Dictionary[] dictionaries;
+
+  protected DataType[] dataTypes;
+  /**
+   * carbon columns
+   */
+  protected CarbonColumn[] carbonColumns;
+
+  /**
+   * This initialization is done inside executor task
+   * for column dictionary involved in decoding.
+   *
+   * @param carbonColumns column list
+   * @param absoluteTableIdentifier table identifier
+   */
+  @Override public void initialize(CarbonColumn[] carbonColumns,
+      AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
+    this.carbonColumns = carbonColumns;
+    dictionaries = new Dictionary[carbonColumns.length];
+    dataTypes = new DataType[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i]
+          .hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        CacheProvider cacheProvider = CacheProvider.getInstance();
+        Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
+            .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
+        dataTypes[i] = carbonColumns[i].getDataType();
+        dictionaries[i] = forwardDictionaryCache.get(new DictionaryColumnUniqueIdentifier(
+            absoluteTableIdentifier.getCarbonTableIdentifier(),
+            carbonColumns[i].getColumnIdentifier(), dataTypes[i]));
+      } else {
+        dataTypes[i] = carbonColumns[i].getDataType();
+      }
+    }
+  }
+
+  @Override public T readRow(Object[] data) {
+    assert (data.length == dictionaries.length);
+    for (int i = 0; i < dictionaries.length; i++) {
+      if (dictionaries[i] != null) {
+        data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
+      }
+    }
+    return (T)data;
+  }
+
+  /**
+   * to book keep the dictionary cache or update access count for each
+   * column involved during decode, to facilitate LRU cache policy if memory
+   * threshold is reached
+   */
+  @Override public void close() {
+    for (int i = 0; i < dictionaries.length; i++) {
+      CarbonUtil.clearDictionaryCache(dictionaries[i]);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java
deleted file mode 100644
index ec02ab0..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.readsupport.impl;
-
-/**
- * It decodes the dictionary values to actual values.
- */
-public class DictionaryDecodedReadSupportImpl
-    extends AbstractDictionaryDecodedReadSupport<Object[]> {
-
-  @Override public Object[] readRow(Object[] data) {
-    assert (data.length == dictionaries.length);
-    for (int i = 0; i < dictionaries.length; i++) {
-      if (dictionaries[i] != null) {
-        data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
-      }
-    }
-    return data;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
index f627b26..7e7d414 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
@@ -27,26 +27,17 @@ public class RawDataReadSupport implements CarbonReadSupport<InternalRow> {
 
   @Override
   public void initialize(CarbonColumn[] carbonColumns,
-      AbsoluteTableIdentifier absoluteTableIdentifier) {
-  }
+      AbsoluteTableIdentifier absoluteTableIdentifier) { }
 
   /**
-   * Just return same data.
+   * return column data as InternalRow
    *
-   * @param data
-   * @return
+   * @param data column data
    */
   @Override
   public InternalRow readRow(Object[] data) {
     return new GenericInternalRow(data);
   }
 
-  /**
-   * This method iwll be used to clear the dictionary cache and update access count for each
-   * column involved which will be used during eviction of columns from LRU cache if memory
-   * reaches threshold
-   */
-  @Override public void close() {
-
-  }
+  @Override public void close() { }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/test/java/org/apache/carbondata/hadoop/csv/CSVInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/csv/CSVInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/csv/CSVInputFormatTest.java
deleted file mode 100644
index 38b8b52..0000000
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/csv/CSVInputFormatTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.csv;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.hadoop.io.StringArrayWritable;
-
-import junit.framework.TestCase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.Lz4Codec;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public class CSVInputFormatTest extends TestCase {
-
-  /**
-   * generate compressed files, no need to call this method.
-   * @throws Exception
-   */
-  public void generateCompressFiles() throws Exception {
-    String pwd = new File("src/test/resources").getCanonicalPath();
-    String inputFile = pwd + "/data.csv";
-    FileInputStream input = new FileInputStream(inputFile);
-    Configuration conf = new Configuration();
-
-    // .gz
-    String outputFile = pwd + "/data.csv.gz";
-    FileOutputStream output = new FileOutputStream(outputFile);
-    GzipCodec gzip = new GzipCodec();
-    gzip.setConf(conf);
-    CompressionOutputStream outputStream = gzip.createOutputStream(output);
-    int i = -1;
-    while ((i = input.read()) != -1) {
-      outputStream.write(i);
-    }
-    outputStream.close();
-    input.close();
-
-    // .bz2
-    input = new FileInputStream(inputFile);
-    outputFile = pwd + "/data.csv.bz2";
-    output = new FileOutputStream(outputFile);
-    BZip2Codec bzip2 = new BZip2Codec();
-    bzip2.setConf(conf);
-    outputStream = bzip2.createOutputStream(output);
-    i = -1;
-    while ((i = input.read()) != -1) {
-      outputStream.write(i);
-    }
-    outputStream.close();
-    input.close();
-
-    // .snappy
-    input = new FileInputStream(inputFile);
-    outputFile = pwd + "/data.csv.snappy";
-    output = new FileOutputStream(outputFile);
-    SnappyCodec snappy = new SnappyCodec();
-    snappy.setConf(conf);
-    outputStream = snappy.createOutputStream(output);
-    i = -1;
-    while ((i = input.read()) != -1) {
-      outputStream.write(i);
-    }
-    outputStream.close();
-    input.close();
-
-    //.lz4
-    input = new FileInputStream(inputFile);
-    outputFile = pwd + "/data.csv.lz4";
-    output = new FileOutputStream(outputFile);
-    Lz4Codec lz4 = new Lz4Codec();
-    lz4.setConf(conf);
-    outputStream = lz4.createOutputStream(output);
-    i = -1;
-    while ((i = input.read()) != -1) {
-      outputStream.write(i);
-    }
-    outputStream.close();
-    input.close();
-
-  }
-
-  /**
-   * CSVCheckMapper check the content of csv files.
-   */
-  public static class CSVCheckMapper extends Mapper<NullWritable, StringArrayWritable, NullWritable,
-      NullWritable> {
-    @Override
-    protected void map(NullWritable key, StringArrayWritable value, Context context)
-        throws IOException, InterruptedException {
-      String[] columns = value.get();
-      int id = Integer.parseInt(columns[0]);
-      int salary = Integer.parseInt(columns[6]);
-      Assert.assertEquals(id - 1, salary - 15000);
-    }
-  }
-
-  /**
-   * test read csv files
-   * @throws Exception
-   */
-  @Test public void testReadCSVFiles() throws Exception{
-    Configuration conf = new Configuration();
-    prepareConf(conf);
-    Job job = Job.getInstance(conf, "CSVInputFormat_normal");
-    job.setJarByClass(CSVInputFormatTest.class);
-    job.setMapperClass(CSVCheckMapper.class);
-    job.setNumReduceTasks(0);
-    job.setInputFormatClass(CSVInputFormat.class);
-
-    String inputFolder = new File("src/test/resources").getCanonicalPath();
-    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv"));
-    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.bz2"));
-    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.gz"));
-    // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.lz4"));
-    // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.snappy"));
-
-    File output = new File("target/output_CSVInputFormatTest");
-    deleteOutput(output);
-    FileOutputFormat.setOutputPath(job, new Path(output.getCanonicalPath()));
-
-    Assert.assertTrue(job.waitForCompletion(true));
-  }
-
-  private void prepareConf(Configuration conf) {
-    conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true);
-  }
-
-  private void deleteOutput(File output) {
-    if (output.exists()) {
-      if (output.isDirectory()) {
-        for(File file : output.listFiles()) {
-          deleteOutput(file);
-        }
-        output.delete();
-      } else {
-        output.delete();
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/test/resources/data.csv.bz2
----------------------------------------------------------------------
diff --git a/hadoop/src/test/resources/data.csv.bz2 b/hadoop/src/test/resources/data.csv.bz2
deleted file mode 100644
index 72ea1b0..0000000
Binary files a/hadoop/src/test/resources/data.csv.bz2 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/test/resources/data.csv.gz
----------------------------------------------------------------------
diff --git a/hadoop/src/test/resources/data.csv.gz b/hadoop/src/test/resources/data.csv.gz
deleted file mode 100644
index a523280..0000000
Binary files a/hadoop/src/test/resources/data.csv.gz and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/test/resources/data.csv.lz4
----------------------------------------------------------------------
diff --git a/hadoop/src/test/resources/data.csv.lz4 b/hadoop/src/test/resources/data.csv.lz4
deleted file mode 100644
index 5288e02..0000000
Binary files a/hadoop/src/test/resources/data.csv.lz4 and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/test/resources/data.csv.snappy
----------------------------------------------------------------------
diff --git a/hadoop/src/test/resources/data.csv.snappy b/hadoop/src/test/resources/data.csv.snappy
deleted file mode 100644
index ff8c8f0..0000000
Binary files a/hadoop/src/test/resources/data.csv.snappy and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 9ed0913..cb2bd3e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -41,8 +41,8 @@ import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
-import org.apache.carbondata.hadoop.csv.CSVInputFormat
-import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
+import org.apache.carbondata.processing.csvload.CSVInputFormat
+import org.apache.carbondata.processing.csvload.CSVRecordReaderIterator
 import org.apache.carbondata.processing.csvreaderstep.BlockDetails
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.DataLoadExecutor
@@ -207,7 +207,7 @@ class NewCarbonDataLoadRDD[K, V](
           val readers =
           split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
           readers.zipWithIndex.map { case (reader, index) =>
-            new RecordReaderIterator(reader,
+            new CSVRecordReaderIterator(reader,
               split.partitionBlocksDetail(index),
               hadoopAttemptContext)
           }
@@ -233,7 +233,7 @@ class NewCarbonDataLoadRDD[K, V](
           val readers =
             split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
           readers.zipWithIndex.map { case (reader, index) =>
-            new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
+            new CSVRecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 2d62fe6..7592e4e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -33,7 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.processing.csvload.CSVInputFormat
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 09c049d..ef759cf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -50,8 +50,8 @@ import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter
-import org.apache.carbondata.hadoop.csv.CSVInputFormat
-import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.csvload.CSVInputFormat
+import org.apache.carbondata.processing.csvload.StringArrayWritable
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.CarbonSparkFactory

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index d7fce90..7333115 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -25,13 +25,13 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
 import org.apache.spark.unsafe.types.UTF8String;
 
-public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> {
+public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> {
 
   @Override public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 652f56c..b848543 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -21,12 +21,12 @@ import java.io.IOException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 
-public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<InternalRow> {
+public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<InternalRow> {
 
   @Override public void initialize(CarbonColumn[] carbonColumns,
       AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
new file mode 100644
index 0000000..9f80c07
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.csvload;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Customarized reader class to read data from file
+ * untill the upper threshold reached.
+ */
+public class BoundedInputStream extends InputStream {
+
+  /**
+   * byte value of the new line character
+   */
+  private static final byte END_OF_LINE_BYTE_VALUE = '\n';
+
+  /**
+   * number of extra character to read
+   */
+  private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
+
+  /**
+   * number of bytes remaining
+   */
+  private long remaining;
+  /**
+   * to check whether end of line is found
+   */
+  private boolean endOfLineFound = false;
+
+  private DataInputStream in;
+
+  public BoundedInputStream(DataInputStream in, long limit) {
+    this.in = in;
+    this.remaining = limit;
+  }
+
+  /**
+   * Below method will be used to read the data from file
+   *
+   * @throws IOException
+   *           problem while reading
+   */
+  @Override
+  public int read() throws IOException {
+    if (this.remaining == 0) {
+      return -1;
+    } else {
+      int var1 = this.in.read();
+      if (var1 >= 0) {
+        --this.remaining;
+      }
+
+      return var1;
+    }
+  }
+
+  /**
+   * Below method will be used to read the data from file. If limit reaches in
+   * that case it will read until new line character is reached
+   *
+   * @param buffer
+   *          buffer in which data will be read
+   * @param offset
+   *          from position to buffer will be filled
+   * @param length
+   *          number of character to be read
+   * @throws IOException
+   *           problem while reading
+   */
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    if (this.remaining == 0) {
+      return -1;
+    } else {
+      if (this.remaining < length) {
+        length = (int) this.remaining;
+      }
+
+      length = this.in.read(buffer, offset, length);
+      if (length >= 0) {
+        this.remaining -= length;
+        if (this.remaining == 0 && !endOfLineFound) {
+          endOfLineFound = true;
+          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+        } else if (endOfLineFound) {
+          int end = offset + length;
+          for (int i = offset; i < end; i++) {
+            if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
+              this.remaining = 0;
+              return (i - offset) + 1;
+            }
+          }
+          this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+        }
+      }
+      return length;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (in != null) {
+      in.close();
+    }
+  }
+
+  public long getRemaining() {
+    return  this.remaining;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
new file mode 100644
index 0000000..f38175d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.csvload;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * An {@link org.apache.hadoop.mapreduce.InputFormat} for csv files.  Files are broken into lines.
+ * Values are the line of csv files.
+ */
+public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWritable> {
+
+  public static final String DELIMITER = "carbon.csvinputformat.delimiter";
+  public static final String DELIMITER_DEFAULT = ",";
+  public static final String COMMENT = "carbon.csvinputformat.comment";
+  public static final String COMMENT_DEFAULT = "#";
+  public static final String QUOTE = "carbon.csvinputformat.quote";
+  public static final String QUOTE_DEFAULT = "\"";
+  public static final String ESCAPE = "carbon.csvinputformat.escape";
+  public static final String ESCAPE_DEFAULT = "\\";
+  public static final String HEADER_PRESENT = "caron.csvinputformat.header.present";
+  public static final boolean HEADER_PRESENT_DEFAULT = false;
+  public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size";
+  public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+
+  @Override
+  public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit inputSplit,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    return new CSVRecordReader();
+  }
+
+  @Override
+  protected boolean isSplitable(JobContext context, Path file) {
+    final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration())
+        .getCodec(file);
+    if (null == codec) {
+      return true;
+    }
+    return codec instanceof SplittableCompressionCodec;
+  }
+
+  /**
+   * Sets the comment char to configuration. Default it is #.
+   * @param configuration
+   * @param commentChar
+   */
+  public static void setCommentCharacter(Configuration configuration, String commentChar) {
+    if (commentChar != null && !commentChar.isEmpty()) {
+      configuration.set(COMMENT, commentChar);
+    }
+  }
+
+  /**
+   * Sets the delimiter to configuration. Default it is ','
+   * @param configuration
+   * @param delimiter
+   */
+  public static void setCSVDelimiter(Configuration configuration, String delimiter) {
+    if (delimiter != null && !delimiter.isEmpty()) {
+      configuration.set(DELIMITER, delimiter);
+    }
+  }
+
+  /**
+   * Sets the escape character to configuration. Default it is \
+   * @param configuration
+   * @param escapeCharacter
+   */
+  public static void setEscapeCharacter(Configuration configuration, String escapeCharacter) {
+    if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
+      configuration.set(ESCAPE, escapeCharacter);
+    }
+  }
+
+  /**
+   * Whether header needs to read from csv or not. By default it is false.
+   * @param configuration
+   * @param headerExtractEnable
+   */
+  public static void setHeaderExtractionEnabled(Configuration configuration,
+      boolean headerExtractEnable) {
+    configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
+  }
+
+  /**
+   * Sets the quote character to configuration. Default it is "
+   * @param configuration
+   * @param quoteCharacter
+   */
+  public static void setQuoteCharacter(Configuration configuration, String quoteCharacter) {
+    if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
+      configuration.set(QUOTE, quoteCharacter);
+    }
+  }
+
+  /**
+   * Sets the read buffer size to configuration.
+   * @param configuration
+   * @param bufferSize
+   */
+  public static void setReadBufferSize(Configuration configuration, String bufferSize) {
+    if (bufferSize != null && !bufferSize.isEmpty()) {
+      configuration.set(READ_BUFFER_SIZE, bufferSize);
+    }
+  }
+
+  /**
+   * Treats value as line in file. Key is null.
+   */
+  public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> {
+
+    private long start;
+    private long end;
+    private BoundedInputStream boundedInputStream;
+    private Reader reader;
+    private CsvParser csvParser;
+    private StringArrayWritable value;
+    private String[] columns;
+    private Seekable filePosition;
+    private boolean isCompressedInput;
+    private Decompressor decompressor;
+
+    @Override
+    public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+        throws IOException, InterruptedException {
+      FileSplit split = (FileSplit) inputSplit;
+      start = split.getStart();
+      end = start + split.getLength();
+      Path file = split.getPath();
+      Configuration job = context.getConfiguration();
+      CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
+      FileSystem fs = file.getFileSystem(job);
+      int bufferSize = Integer.parseInt(job.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT));
+      FSDataInputStream fileIn = fs.open(file, bufferSize);
+      InputStream inputStream;
+      if (codec != null) {
+        isCompressedInput = true;
+        decompressor = CodecPool.getDecompressor(codec);
+        if (codec instanceof SplittableCompressionCodec) {
+          SplitCompressionInputStream scIn = ((SplittableCompressionCodec) codec)
+              .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec
+                  .READ_MODE.BYBLOCK);
+          start = scIn.getAdjustedStart();
+          end = scIn.getAdjustedEnd();
+          if (start != 0) {
+            LineReader lineReader = new LineReader(scIn, 1);
+            start += lineReader.readLine(new Text(), 0);
+          }
+          filePosition = scIn;
+          inputStream = scIn;
+        } else {
+          CompressionInputStream cIn = codec.createInputStream(fileIn, decompressor);
+          filePosition = cIn;
+          inputStream = cIn;
+        }
+      } else {
+        fileIn.seek(start);
+        if (start != 0) {
+          LineReader lineReader = new LineReader(fileIn, 1);
+          start += lineReader.readLine(new Text(), 0);
+        }
+        boundedInputStream = new BoundedInputStream(fileIn, end - start);
+        filePosition = fileIn;
+        inputStream = boundedInputStream;
+      }
+      reader = new InputStreamReader(inputStream);
+      csvParser = new CsvParser(extractCsvParserSettings(job));
+      csvParser.beginParsing(reader);
+    }
+
+    private CsvParserSettings extractCsvParserSettings(Configuration job) {
+      CsvParserSettings parserSettings = new CsvParserSettings();
+      parserSettings.getFormat().setDelimiter(job.get(DELIMITER, DELIMITER_DEFAULT).charAt(0));
+      parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
+      parserSettings.setLineSeparatorDetectionEnabled(true);
+      parserSettings.setNullValue("");
+      parserSettings.setIgnoreLeadingWhitespaces(false);
+      parserSettings.setIgnoreTrailingWhitespaces(false);
+      parserSettings.setSkipEmptyLines(false);
+      // TODO get from csv file.
+      parserSettings.setMaxColumns(1000);
+      parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
+      parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
+      if (start == 0) {
+        parserSettings.setHeaderExtractionEnabled(job.getBoolean(HEADER_PRESENT,
+            HEADER_PRESENT_DEFAULT));
+      }
+      return parserSettings;
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      columns = csvParser.parseNext();
+      if (columns == null) {
+        value = null;
+        return false;
+      }
+      if (value == null) {
+        value = new StringArrayWritable();
+      }
+      value.set(columns);
+      return true;
+    }
+
+    @Override
+    public NullWritable getCurrentKey() throws IOException, InterruptedException {
+      return NullWritable.get();
+    }
+
+    @Override
+    public StringArrayWritable getCurrentValue() throws IOException, InterruptedException {
+      return value;
+    }
+
+    private long getPos() throws IOException {
+      long retVal = start;
+      if (null != boundedInputStream) {
+        retVal = end - boundedInputStream.getRemaining();
+      } else if (isCompressedInput && null != filePosition) {
+        retVal = filePosition.getPos();
+      }
+      return retVal;
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return start == end ? 0.0F : Math.min(1.0F, (float) (getPos() -
+          start) / (float) (end - start));
+    }
+
+    @Override
+    public void close() throws IOException {
+      try {
+        if (reader != null) {
+          reader.close();
+        }
+      } finally {
+        if (decompressor != null) {
+          CodecPool.returnDecompressor(decompressor);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
new file mode 100644
index 0000000..10a036a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.csvload;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * It is wrapper iterator around @{@link RecordReader}.
+ */
+public class CSVRecordReaderIterator extends CarbonIterator<Object []> {
+
+  private RecordReader<NullWritable, StringArrayWritable> recordReader;
+
+  /**
+   * It is just a little hack to make recordreader as iterator. Usually we cannot call hasNext
+   * multiple times on record reader as it moves another line. To avoid that situation like hasNext
+   * only tells whether next row is present or not and next will move the pointer to next row after
+   * consuming it.
+   */
+  private boolean isConsumed;
+
+  private InputSplit split;
+
+  private TaskAttemptContext context;
+
+  public CSVRecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader,
+      InputSplit split, TaskAttemptContext context) {
+    this.recordReader = recordReader;
+    this.split = split;
+    this.context = context;
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      if (!isConsumed) {
+        isConsumed = recordReader.nextKeyValue();
+        return isConsumed;
+      }
+      return true;
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  @Override
+  public Object[] next() {
+    try {
+      String[] data = recordReader.getCurrentValue().get();
+      isConsumed = false;
+      return data;
+    } catch (Exception e) {
+      throw new CarbonDataLoadingException(e);
+    }
+  }
+
+  @Override
+  public void initialize() {
+    try {
+      recordReader.initialize(split, context);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      recordReader.close();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
new file mode 100644
index 0000000..7eb3ec9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.csvload;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A String sequence that is usable as a key or value.
+ */
+public class StringArrayWritable implements Writable {
+  private String[] values;
+
+  public String[] toStrings() {
+    return values;
+  }
+
+  public void set(String[] values) {
+    this.values = values;
+  }
+
+  public String[] get() {
+    return values;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int length = in.readInt();
+    values = new String[length];
+    for (int i = 0; i < length; i++) {
+      byte[] b = new byte[in.readInt()];
+      in.readFully(b);
+      values[i] = new String(b, Charset.defaultCharset());
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(values.length);                 // write values
+    for (int i = 0; i < values.length; i++) {
+      byte[] b = values[i].getBytes(Charset.defaultCharset());
+      out.writeInt(b.length);
+      out.write(b);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(values);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
new file mode 100644
index 0000000..66aedb6
--- /dev/null
+++ b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.csvload;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class CSVInputFormatTest extends TestCase {
+
+  /**
+   * generate compressed files, no need to call this method.
+   * @throws Exception
+   */
+  public void generateCompressFiles() throws Exception {
+    String pwd = new File("src/test/resources/csv").getCanonicalPath();
+    String inputFile = pwd + "/data.csv";
+    FileInputStream input = new FileInputStream(inputFile);
+    Configuration conf = new Configuration();
+
+    // .gz
+    String outputFile = pwd + "/data.csv.gz";
+    FileOutputStream output = new FileOutputStream(outputFile);
+    GzipCodec gzip = new GzipCodec();
+    gzip.setConf(conf);
+    CompressionOutputStream outputStream = gzip.createOutputStream(output);
+    int i = -1;
+    while ((i = input.read()) != -1) {
+      outputStream.write(i);
+    }
+    outputStream.close();
+    input.close();
+
+    // .bz2
+    input = new FileInputStream(inputFile);
+    outputFile = pwd + "/data.csv.bz2";
+    output = new FileOutputStream(outputFile);
+    BZip2Codec bzip2 = new BZip2Codec();
+    bzip2.setConf(conf);
+    outputStream = bzip2.createOutputStream(output);
+    i = -1;
+    while ((i = input.read()) != -1) {
+      outputStream.write(i);
+    }
+    outputStream.close();
+    input.close();
+
+    // .snappy
+    input = new FileInputStream(inputFile);
+    outputFile = pwd + "/data.csv.snappy";
+    output = new FileOutputStream(outputFile);
+    SnappyCodec snappy = new SnappyCodec();
+    snappy.setConf(conf);
+    outputStream = snappy.createOutputStream(output);
+    i = -1;
+    while ((i = input.read()) != -1) {
+      outputStream.write(i);
+    }
+    outputStream.close();
+    input.close();
+
+    //.lz4
+    input = new FileInputStream(inputFile);
+    outputFile = pwd + "/data.csv.lz4";
+    output = new FileOutputStream(outputFile);
+    Lz4Codec lz4 = new Lz4Codec();
+    lz4.setConf(conf);
+    outputStream = lz4.createOutputStream(output);
+    i = -1;
+    while ((i = input.read()) != -1) {
+      outputStream.write(i);
+    }
+    outputStream.close();
+    input.close();
+
+  }
+
+  /**
+   * CSVCheckMapper check the content of csv files.
+   */
+  public static class CSVCheckMapper extends Mapper<NullWritable, StringArrayWritable, NullWritable,
+      NullWritable> {
+    @Override
+    protected void map(NullWritable key, StringArrayWritable value, Context context)
+        throws IOException, InterruptedException {
+      String[] columns = value.get();
+      int id = Integer.parseInt(columns[0]);
+      int salary = Integer.parseInt(columns[6]);
+      Assert.assertEquals(id - 1, salary - 15000);
+    }
+  }
+
+  /**
+   * test read csv files
+   * @throws Exception
+   */
+  @Test public void testReadCSVFiles() throws Exception{
+    Configuration conf = new Configuration();
+    prepareConf(conf);
+    Job job = Job.getInstance(conf, "CSVInputFormat_normal");
+    job.setJarByClass(CSVInputFormatTest.class);
+    job.setMapperClass(CSVCheckMapper.class);
+    job.setNumReduceTasks(0);
+    job.setInputFormatClass(CSVInputFormat.class);
+
+    String inputFolder = new File("src/test/resources/csv").getCanonicalPath();
+    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv"));
+    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.bz2"));
+    FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.gz"));
+    // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.lz4"));
+    // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.snappy"));
+
+    File output = new File("target/output_CSVInputFormatTest");
+    deleteOutput(output);
+    FileOutputFormat.setOutputPath(job, new Path(output.getCanonicalPath()));
+
+    Assert.assertTrue(job.waitForCompletion(true));
+  }
+
+  private void prepareConf(Configuration conf) {
+    conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true);
+  }
+
+  private void deleteOutput(File output) {
+    if (output.exists()) {
+      if (output.isDirectory()) {
+        for(File file : output.listFiles()) {
+          deleteOutput(file);
+        }
+        output.delete();
+      } else {
+        output.delete();
+      }
+    }
+  }
+}