You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/02/14 23:26:51 UTC
[2/3] incubator-carbondata git commit: csvReader code improvements
csvReader code improvements
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d6ceb1d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d6ceb1d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d6ceb1d3
Branch: refs/heads/master
Commit: d6ceb1d3dd5110a85e861b5b5b2dd1021d1714ef
Parents: 25de27f
Author: Jihong Ma <ji...@apache.org>
Authored: Mon Jan 30 21:41:18 2017 -0800
Committer: jackylk <ja...@huawei.com>
Committed: Tue Feb 14 15:08:20 2017 -0800
----------------------------------------------------------------------
.../carbondata/hadoop/CarbonInputFormat.java | 6 +-
.../carbondata/hadoop/csv/CSVInputFormat.java | 288 -----
.../recorditerator/RecordReaderIterator.java | 98 --
.../hadoop/io/BoundedInputStream.java | 129 ---
.../hadoop/io/StringArrayWritable.java | 70 --
.../hadoop/readsupport/CarbonReadSupport.java | 15 +-
.../AbstractDictionaryDecodedReadSupport.java | 84 --
.../impl/ArrayWritableReadSupport.java | 48 -
.../impl/DictionaryDecodeReadSupport.java | 94 ++
.../impl/DictionaryDecodedReadSupportImpl.java | 34 -
.../readsupport/impl/RawDataReadSupport.java | 17 +-
.../hadoop/csv/CSVInputFormatTest.java | 169 ---
hadoop/src/test/resources/data.csv.bz2 | Bin 10572 -> 0 bytes
hadoop/src/test/resources/data.csv.gz | Bin 14710 -> 0 bytes
hadoop/src/test/resources/data.csv.lz4 | Bin 24495 -> 0 bytes
hadoop/src/test/resources/data.csv.snappy | Bin 24263 -> 0 bytes
.../spark/rdd/NewCarbonDataLoadRDD.scala | 8 +-
.../carbondata/spark/util/CommonUtil.scala | 2 +-
.../spark/util/GlobalDictionaryUtil.scala | 4 +-
.../readsupport/SparkRowReadSupportImpl.java | 4 +-
.../readsupport/SparkRowReadSupportImpl.java | 4 +-
.../processing/csvload/BoundedInputStream.java | 129 +++
.../processing/csvload/CSVInputFormat.java | 285 +++++
.../csvload/CSVRecordReaderIterator.java | 97 ++
.../processing/csvload/StringArrayWritable.java | 70 ++
.../processing/csvload/CSVInputFormatTest.java | 167 +++
processing/src/test/resources/csv/data.csv | 1001 ++++++++++++++++++
processing/src/test/resources/csv/data.csv.bz2 | Bin 0 -> 10572 bytes
processing/src/test/resources/csv/data.csv.gz | Bin 0 -> 14710 bytes
processing/src/test/resources/csv/data.csv.lz4 | Bin 0 -> 24495 bytes
.../src/test/resources/csv/data.csv.snappy | Bin 0 -> 24263 bytes
31 files changed, 1870 insertions(+), 953 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 4f81438..8187089 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -60,7 +60,7 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonStorePath;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.carbondata.hadoop.util.BlockLevelTraverser;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
@@ -641,7 +641,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
//By default it uses dictionary decoder read class
- CarbonReadSupport readSupport = null;
+ CarbonReadSupport<T> readSupport = null;
if (readSupportClass != null) {
try {
Class<?> myClass = Class.forName(readSupportClass);
@@ -656,7 +656,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
LOG.error("Error while creating " + readSupportClass, ex);
}
} else {
- readSupport = new DictionaryDecodedReadSupportImpl();
+ readSupport = new DictionaryDecodeReadSupport<>();
}
return readSupport;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
deleted file mode 100644
index 9e35d13..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/CSVInputFormat.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.csv;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-
-import org.apache.carbondata.hadoop.io.BoundedInputStream;
-import org.apache.carbondata.hadoop.io.StringArrayWritable;
-
-import com.univocity.parsers.csv.CsvParser;
-import com.univocity.parsers.csv.CsvParserSettings;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.compress.CodecPool;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.SplitCompressionInputStream;
-import org.apache.hadoop.io.compress.SplittableCompressionCodec;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.util.LineReader;
-
-/**
- * An {@link org.apache.hadoop.mapreduce.InputFormat} for csv files. Files are broken into lines.
- * Values are the line of csv files.
- */
-public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWritable> {
-
- public static final String DELIMITER = "carbon.csvinputformat.delimiter";
- public static final String DELIMITER_DEFAULT = ",";
- public static final String COMMENT = "carbon.csvinputformat.comment";
- public static final String COMMENT_DEFAULT = "#";
- public static final String QUOTE = "carbon.csvinputformat.quote";
- public static final String QUOTE_DEFAULT = "\"";
- public static final String ESCAPE = "carbon.csvinputformat.escape";
- public static final String ESCAPE_DEFAULT = "\\";
- public static final String HEADER_PRESENT = "caron.csvinputformat.header.present";
- public static final boolean HEADER_PRESENT_DEFAULT = false;
- public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size";
- public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
-
- @Override
- public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit inputSplit,
- TaskAttemptContext context) throws IOException, InterruptedException {
- return new CSVRecordReader();
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path file) {
- final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration())
- .getCodec(file);
- if (null == codec) {
- return true;
- }
- return codec instanceof SplittableCompressionCodec;
- }
-
- /**
- * Sets the comment char to configuration. Default it is #.
- * @param configuration
- * @param commentChar
- */
- public static void setCommentCharacter(Configuration configuration, String commentChar) {
- if (commentChar != null && !commentChar.isEmpty()) {
- configuration.set(COMMENT, commentChar);
- }
- }
-
- /**
- * Sets the delimiter to configuration. Default it is ','
- * @param configuration
- * @param delimiter
- */
- public static void setCSVDelimiter(Configuration configuration, String delimiter) {
- if (delimiter != null && !delimiter.isEmpty()) {
- configuration.set(DELIMITER, delimiter);
- }
- }
-
- /**
- * Sets the escape character to configuration. Default it is \
- * @param configuration
- * @param escapeCharacter
- */
- public static void setEscapeCharacter(Configuration configuration, String escapeCharacter) {
- if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
- configuration.set(ESCAPE, escapeCharacter);
- }
- }
-
- /**
- * Whether header needs to read from csv or not. By default it is false.
- * @param configuration
- * @param headerExtractEnable
- */
- public static void setHeaderExtractionEnabled(Configuration configuration,
- boolean headerExtractEnable) {
- configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
- }
-
- /**
- * Sets the quote character to configuration. Default it is "
- * @param configuration
- * @param quoteCharacter
- */
- public static void setQuoteCharacter(Configuration configuration, String quoteCharacter) {
- if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
- configuration.set(QUOTE, quoteCharacter);
- }
- }
-
- /**
- * Sets the read buffer size to configuration.
- * @param configuration
- * @param bufferSize
- */
- public static void setReadBufferSize(Configuration configuration, String bufferSize) {
- if (bufferSize != null && !bufferSize.isEmpty()) {
- configuration.set(READ_BUFFER_SIZE, bufferSize);
- }
- }
-
- /**
- * Treats value as line in file. Key is null.
- */
- public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> {
-
- private long start;
- private long end;
- private BoundedInputStream boundedInputStream;
- private Reader reader;
- private CsvParser csvParser;
- private StringArrayWritable value;
- private String[] columns;
- private Seekable filePosition;
- private boolean isCompressedInput;
- private Decompressor decompressor;
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext context)
- throws IOException, InterruptedException {
- FileSplit split = (FileSplit) inputSplit;
- start = split.getStart();
- end = start + split.getLength();
- Path file = split.getPath();
- Configuration job = context.getConfiguration();
- CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
- FileSystem fs = file.getFileSystem(job);
- int bufferSize = Integer.parseInt(job.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT));
- FSDataInputStream fileIn = fs.open(file, bufferSize);
- InputStream inputStream;
- if (codec != null) {
- isCompressedInput = true;
- decompressor = CodecPool.getDecompressor(codec);
- if (codec instanceof SplittableCompressionCodec) {
- SplitCompressionInputStream scIn = ((SplittableCompressionCodec) codec)
- .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec
- .READ_MODE.BYBLOCK);
- start = scIn.getAdjustedStart();
- end = scIn.getAdjustedEnd();
- if (start != 0) {
- LineReader lineReader = new LineReader(scIn, 1);
- start += lineReader.readLine(new Text(), 0);
- }
- filePosition = scIn;
- inputStream = scIn;
- } else {
- CompressionInputStream cIn = codec.createInputStream(fileIn, decompressor);
- filePosition = cIn;
- inputStream = cIn;
- }
- } else {
- fileIn.seek(start);
- if (start != 0) {
- LineReader lineReader = new LineReader(fileIn, 1);
- start += lineReader.readLine(new Text(), 0);
- }
- boundedInputStream = new BoundedInputStream(fileIn, end - start);
- filePosition = fileIn;
- inputStream = boundedInputStream;
- }
- reader = new InputStreamReader(inputStream);
- csvParser = new CsvParser(extractCsvParserSettings(job));
- csvParser.beginParsing(reader);
- }
-
- private CsvParserSettings extractCsvParserSettings(Configuration job) {
- CsvParserSettings parserSettings = new CsvParserSettings();
- parserSettings.getFormat().setDelimiter(job.get(DELIMITER, DELIMITER_DEFAULT).charAt(0));
- parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
- parserSettings.setLineSeparatorDetectionEnabled(true);
- parserSettings.setNullValue("");
- parserSettings.setIgnoreLeadingWhitespaces(false);
- parserSettings.setIgnoreTrailingWhitespaces(false);
- parserSettings.setSkipEmptyLines(false);
- // TODO get from csv file.
- parserSettings.setMaxColumns(1000);
- parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
- parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
- if (start == 0) {
- parserSettings.setHeaderExtractionEnabled(job.getBoolean(HEADER_PRESENT,
- HEADER_PRESENT_DEFAULT));
- }
- return parserSettings;
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- columns = csvParser.parseNext();
- if (columns == null) {
- value = null;
- return false;
- }
- if (value == null) {
- value = new StringArrayWritable();
- }
- value.set(columns);
- return true;
- }
-
- @Override
- public NullWritable getCurrentKey() throws IOException, InterruptedException {
- return NullWritable.get();
- }
-
- @Override
- public StringArrayWritable getCurrentValue() throws IOException, InterruptedException {
- return value;
- }
-
- private long getPos() throws IOException {
- long retVal = start;
- if (null != boundedInputStream) {
- retVal = end - boundedInputStream.getRemaining();
- } else if (isCompressedInput && null != filePosition) {
- retVal = filePosition.getPos();
- }
- return retVal;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return start == end ? 0.0F : Math.min(1.0F, (float) (getPos() -
- start) / (float) (end - start));
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (reader != null) {
- reader.close();
- }
- } finally {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
deleted file mode 100644
index 39dd916..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/csv/recorditerator/RecordReaderIterator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.hadoop.csv.recorditerator;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.hadoop.io.StringArrayWritable;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-/**
- * It is wrapper iterator around @{@link RecordReader}.
- */
-public class RecordReaderIterator extends CarbonIterator<Object []> {
-
- private RecordReader<NullWritable, StringArrayWritable> recordReader;
-
- /**
- * It is just a little hack to make recordreader as iterator. Usually we cannot call hasNext
- * multiple times on record reader as it moves another line. To avoid that situation like hasNext
- * only tells whether next row is present or not and next will move the pointer to next row after
- * consuming it.
- */
- private boolean isConsumed;
-
- private InputSplit split;
-
- private TaskAttemptContext context;
-
- public RecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader,
- InputSplit split, TaskAttemptContext context) {
- this.recordReader = recordReader;
- this.split = split;
- this.context = context;
- }
-
- @Override
- public boolean hasNext() {
- try {
- if (!isConsumed) {
- isConsumed = recordReader.nextKeyValue();
- return isConsumed;
- }
- return true;
- } catch (Exception e) {
- throw new CarbonDataLoadingException(e);
- }
- }
-
- @Override
- public Object[] next() {
- try {
- String[] data = recordReader.getCurrentValue().get();
- isConsumed = false;
- return data;
- } catch (Exception e) {
- throw new CarbonDataLoadingException(e);
- }
- }
-
- @Override
- public void initialize() {
- try {
- recordReader.initialize(split, context);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() {
- try {
- recordReader.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/io/BoundedInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/io/BoundedInputStream.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/io/BoundedInputStream.java
deleted file mode 100644
index 4451400..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/io/BoundedInputStream.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.io;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Custom reader class to read the data from file it will take care of reading
- * till the limit assigned to this class
- */
-public class BoundedInputStream extends InputStream {
-
- /**
- * byte value of the new line character
- */
- private static final byte END_OF_LINE_BYTE_VALUE = '\n';
-
- /**
- * number of extra character to read
- */
- private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
-
- /**
- * number of bytes remaining
- */
- private long remaining;
- /**
- * to check whether end of line is found
- */
- private boolean endOfLineFound = false;
-
- private DataInputStream in;
-
- public BoundedInputStream(DataInputStream in, long limit) {
- this.in = in;
- this.remaining = limit;
- }
-
- /**
- * Below method will be used to read the data from file
- *
- * @throws IOException
- * problem while reading
- */
- @Override
- public int read() throws IOException {
- if (this.remaining == 0) {
- return -1;
- } else {
- int var1 = this.in.read();
- if (var1 >= 0) {
- --this.remaining;
- }
-
- return var1;
- }
- }
-
- /**
- * Below method will be used to read the data from file. If limit reaches in
- * that case it will read until new line character is reached
- *
- * @param buffer
- * buffer in which data will be read
- * @param offset
- * from position to buffer will be filled
- * @param length
- * number of character to be read
- * @throws IOException
- * problem while reading
- */
- @Override
- public int read(byte[] buffer, int offset, int length) throws IOException {
- if (this.remaining == 0) {
- return -1;
- } else {
- if (this.remaining < length) {
- length = (int) this.remaining;
- }
-
- length = this.in.read(buffer, offset, length);
- if (length >= 0) {
- this.remaining -= length;
- if (this.remaining == 0 && !endOfLineFound) {
- endOfLineFound = true;
- this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
- } else if (endOfLineFound) {
- int end = offset + length;
- for (int i = offset; i < end; i++) {
- if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
- this.remaining = 0;
- return (i - offset) + 1;
- }
- }
- this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
- }
- }
- return length;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (in != null) {
- in.close();
- }
- }
-
- public long getRemaining() {
- return this.remaining;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/io/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/io/StringArrayWritable.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/io/StringArrayWritable.java
deleted file mode 100644
index 6f5ae43..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/io/StringArrayWritable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * A String sequence that is usable as a key or value.
- */
-public class StringArrayWritable implements Writable {
- private String[] values;
-
- public String[] toStrings() {
- return values;
- }
-
- public void set(String[] values) {
- this.values = values;
- }
-
- public String[] get() {
- return values;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- int length = in.readInt();
- values = new String[length];
- for (int i = 0; i < length; i++) {
- byte[] b = new byte[in.readInt()];
- in.readFully(b);
- values[i] = new String(b, Charset.defaultCharset());
- }
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeInt(values.length); // write values
- for (int i = 0; i < values.length; i++) {
- byte[] b = values[i].getBytes(Charset.defaultCharset());
- out.writeInt(b.length);
- out.write(b);
- }
- }
-
- @Override
- public String toString() {
- return Arrays.toString(values);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
index 0ee23c9..b535aea 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java
@@ -22,24 +22,27 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
/**
- * It converts to the desired class while reading the rows from RecordReader
+ * This is the interface to convert data reading from RecordReader to row representation.
*/
public interface CarbonReadSupport<T> {
/**
- * It can use [{@link CarbonColumn}] array to create its own schema to create its row.
+ * Initialization if needed based on the projected column list
*
- * @param carbonColumns
+ * @param carbonColumns column list
+ * @param absoluteTableIdentifier table identifier
*/
void initialize(CarbonColumn[] carbonColumns,
AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException;
+ /**
+ * convert column data back to row representation
+ * @param data column data
+ */
T readRow(Object[] data);
/**
- * This method will be used to clear the dictionary cache and update access count for each
- * column involved which will be used during eviction of columns from LRU cache if memory
- * reaches threshold
+ * cleanup step if necessary
*/
void close();
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
deleted file mode 100644
index 7723737..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.readsupport.impl;
-
-import java.io.IOException;
-
-import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.CacheProvider;
-import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-
-/**
- * Its an abstract class provides necessary information to decode dictionary data
- */
-public abstract class AbstractDictionaryDecodedReadSupport<T> implements CarbonReadSupport<T> {
-
- protected Dictionary[] dictionaries;
-
- protected DataType[] dataTypes;
- /**
- * carbon columns
- */
- protected CarbonColumn[] carbonColumns;
-
- /**
- * It would be instantiated in side the task so the dictionary would be loaded inside every mapper
- * instead of driver.
- *
- * @param carbonColumns
- * @param absoluteTableIdentifier
- */
- @Override public void initialize(CarbonColumn[] carbonColumns,
- AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
- this.carbonColumns = carbonColumns;
- dictionaries = new Dictionary[carbonColumns.length];
- dataTypes = new DataType[carbonColumns.length];
- for (int i = 0; i < carbonColumns.length; i++) {
- if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i]
- .hasEncoding(Encoding.DIRECT_DICTIONARY)) {
- CacheProvider cacheProvider = CacheProvider.getInstance();
- Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
- .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
- dataTypes[i] = carbonColumns[i].getDataType();
- dictionaries[i] = forwardDictionaryCache.get(new DictionaryColumnUniqueIdentifier(
- absoluteTableIdentifier.getCarbonTableIdentifier(),
- carbonColumns[i].getColumnIdentifier(), dataTypes[i]));
- } else {
- dataTypes[i] = carbonColumns[i].getDataType();
- }
- }
- }
-
- /**
- * This method iwll be used to clear the dictionary cache and update access count for each
- * column involved which will be used during eviction of columns from LRU cache if memory
- * reaches threshold
- */
- @Override public void close() {
- for (int i = 0; i < dictionaries.length; i++) {
- CarbonUtil.clearDictionaryCache(dictionaries[i]);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
deleted file mode 100644
index 50272e5..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.readsupport.impl;
-
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-
-import org.apache.hadoop.io.ArrayWritable;
-
-public class ArrayWritableReadSupport implements CarbonReadSupport<ArrayWritable> {
-
- @Override public void initialize(CarbonColumn[] carbonColumns,
- AbsoluteTableIdentifier absoluteTableIdentifier) {
- }
-
- @Override public ArrayWritable readRow(Object[] data) {
-
- String[] writables = new String[data.length];
- for (int i = 0; i < data.length; i++) {
- writables[i] = data[i].toString();
- }
- return new ArrayWritable(writables);
- }
-
- /**
- * This method iwll be used to clear the dictionary cache and update access count for each
- * column involved which will be used during eviction of columns from LRU cache if memory
- * reaches threshold
- */
- @Override public void close() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
new file mode 100644
index 0000000..43953d0
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodeReadSupport.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hadoop.readsupport.impl;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+/**
+ * This is the class to decode dictionary encoded column data back to its original value.
+ */
+public class DictionaryDecodeReadSupport<T> implements CarbonReadSupport<T> {
+
+ protected Dictionary[] dictionaries;
+
+ protected DataType[] dataTypes;
+ /**
+ * carbon columns
+ */
+ protected CarbonColumn[] carbonColumns;
+
+ /**
+ * This initialization is done inside executor task
+ * for column dictionary involved in decoding.
+ *
+ * @param carbonColumns column list
+ * @param absoluteTableIdentifier table identifier
+ */
+ @Override public void initialize(CarbonColumn[] carbonColumns,
+ AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
+ this.carbonColumns = carbonColumns;
+ dictionaries = new Dictionary[carbonColumns.length];
+ dataTypes = new DataType[carbonColumns.length];
+ for (int i = 0; i < carbonColumns.length; i++) {
+ if (carbonColumns[i].hasEncoding(Encoding.DICTIONARY) && !carbonColumns[i]
+ .hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+ CacheProvider cacheProvider = CacheProvider.getInstance();
+ Cache<DictionaryColumnUniqueIdentifier, Dictionary> forwardDictionaryCache = cacheProvider
+ .createCache(CacheType.FORWARD_DICTIONARY, absoluteTableIdentifier.getStorePath());
+ dataTypes[i] = carbonColumns[i].getDataType();
+ dictionaries[i] = forwardDictionaryCache.get(new DictionaryColumnUniqueIdentifier(
+ absoluteTableIdentifier.getCarbonTableIdentifier(),
+ carbonColumns[i].getColumnIdentifier(), dataTypes[i]));
+ } else {
+ dataTypes[i] = carbonColumns[i].getDataType();
+ }
+ }
+ }
+
+ @Override public T readRow(Object[] data) {
+ assert (data.length == dictionaries.length);
+ for (int i = 0; i < dictionaries.length; i++) {
+ if (dictionaries[i] != null) {
+ data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
+ }
+ }
+ return (T)data;
+ }
+
+ /**
+ * to book keep the dictionary cache or update access count for each
+ * column involved during decode, to facilitate LRU cache policy if memory
+ * threshold is reached
+ */
+ @Override public void close() {
+ for (int i = 0; i < dictionaries.length; i++) {
+ CarbonUtil.clearDictionaryCache(dictionaries[i]);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java
deleted file mode 100644
index ec02ab0..0000000
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/DictionaryDecodedReadSupportImpl.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.readsupport.impl;
-
-/**
- * It decodes the dictionary values to actual values.
- */
-public class DictionaryDecodedReadSupportImpl
- extends AbstractDictionaryDecodedReadSupport<Object[]> {
-
- @Override public Object[] readRow(Object[] data) {
- assert (data.length == dictionaries.length);
- for (int i = 0; i < dictionaries.length; i++) {
- if (dictionaries[i] != null) {
- data[i] = dictionaries[i].getDictionaryValueForKey((int) data[i]);
- }
- }
- return data;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
index f627b26..7e7d414 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java
@@ -27,26 +27,17 @@ public class RawDataReadSupport implements CarbonReadSupport<InternalRow> {
@Override
public void initialize(CarbonColumn[] carbonColumns,
- AbsoluteTableIdentifier absoluteTableIdentifier) {
- }
+ AbsoluteTableIdentifier absoluteTableIdentifier) { }
/**
- * Just return same data.
+ * return column data as InternalRow
*
- * @param data
- * @return
+ * @param data column data
*/
@Override
public InternalRow readRow(Object[] data) {
return new GenericInternalRow(data);
}
- /**
- * This method iwll be used to clear the dictionary cache and update access count for each
- * column involved which will be used during eviction of columns from LRU cache if memory
- * reaches threshold
- */
- @Override public void close() {
-
- }
+ @Override public void close() { }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/test/java/org/apache/carbondata/hadoop/csv/CSVInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/csv/CSVInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/csv/CSVInputFormatTest.java
deleted file mode 100644
index 38b8b52..0000000
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/csv/CSVInputFormatTest.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.hadoop.csv;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-
-import org.apache.carbondata.hadoop.io.StringArrayWritable;
-
-import junit.framework.TestCase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.compress.BZip2Codec;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.GzipCodec;
-import org.apache.hadoop.io.compress.Lz4Codec;
-import org.apache.hadoop.io.compress.SnappyCodec;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public class CSVInputFormatTest extends TestCase {
-
- /**
- * generate compressed files, no need to call this method.
- * @throws Exception
- */
- public void generateCompressFiles() throws Exception {
- String pwd = new File("src/test/resources").getCanonicalPath();
- String inputFile = pwd + "/data.csv";
- FileInputStream input = new FileInputStream(inputFile);
- Configuration conf = new Configuration();
-
- // .gz
- String outputFile = pwd + "/data.csv.gz";
- FileOutputStream output = new FileOutputStream(outputFile);
- GzipCodec gzip = new GzipCodec();
- gzip.setConf(conf);
- CompressionOutputStream outputStream = gzip.createOutputStream(output);
- int i = -1;
- while ((i = input.read()) != -1) {
- outputStream.write(i);
- }
- outputStream.close();
- input.close();
-
- // .bz2
- input = new FileInputStream(inputFile);
- outputFile = pwd + "/data.csv.bz2";
- output = new FileOutputStream(outputFile);
- BZip2Codec bzip2 = new BZip2Codec();
- bzip2.setConf(conf);
- outputStream = bzip2.createOutputStream(output);
- i = -1;
- while ((i = input.read()) != -1) {
- outputStream.write(i);
- }
- outputStream.close();
- input.close();
-
- // .snappy
- input = new FileInputStream(inputFile);
- outputFile = pwd + "/data.csv.snappy";
- output = new FileOutputStream(outputFile);
- SnappyCodec snappy = new SnappyCodec();
- snappy.setConf(conf);
- outputStream = snappy.createOutputStream(output);
- i = -1;
- while ((i = input.read()) != -1) {
- outputStream.write(i);
- }
- outputStream.close();
- input.close();
-
- //.lz4
- input = new FileInputStream(inputFile);
- outputFile = pwd + "/data.csv.lz4";
- output = new FileOutputStream(outputFile);
- Lz4Codec lz4 = new Lz4Codec();
- lz4.setConf(conf);
- outputStream = lz4.createOutputStream(output);
- i = -1;
- while ((i = input.read()) != -1) {
- outputStream.write(i);
- }
- outputStream.close();
- input.close();
-
- }
-
- /**
- * CSVCheckMapper check the content of csv files.
- */
- public static class CSVCheckMapper extends Mapper<NullWritable, StringArrayWritable, NullWritable,
- NullWritable> {
- @Override
- protected void map(NullWritable key, StringArrayWritable value, Context context)
- throws IOException, InterruptedException {
- String[] columns = value.get();
- int id = Integer.parseInt(columns[0]);
- int salary = Integer.parseInt(columns[6]);
- Assert.assertEquals(id - 1, salary - 15000);
- }
- }
-
- /**
- * test read csv files
- * @throws Exception
- */
- @Test public void testReadCSVFiles() throws Exception{
- Configuration conf = new Configuration();
- prepareConf(conf);
- Job job = Job.getInstance(conf, "CSVInputFormat_normal");
- job.setJarByClass(CSVInputFormatTest.class);
- job.setMapperClass(CSVCheckMapper.class);
- job.setNumReduceTasks(0);
- job.setInputFormatClass(CSVInputFormat.class);
-
- String inputFolder = new File("src/test/resources").getCanonicalPath();
- FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv"));
- FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.bz2"));
- FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.gz"));
- // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.lz4"));
- // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.snappy"));
-
- File output = new File("target/output_CSVInputFormatTest");
- deleteOutput(output);
- FileOutputFormat.setOutputPath(job, new Path(output.getCanonicalPath()));
-
- Assert.assertTrue(job.waitForCompletion(true));
- }
-
- private void prepareConf(Configuration conf) {
- conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true);
- }
-
- private void deleteOutput(File output) {
- if (output.exists()) {
- if (output.isDirectory()) {
- for(File file : output.listFiles()) {
- deleteOutput(file);
- }
- output.delete();
- } else {
- output.delete();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/test/resources/data.csv.bz2
----------------------------------------------------------------------
diff --git a/hadoop/src/test/resources/data.csv.bz2 b/hadoop/src/test/resources/data.csv.bz2
deleted file mode 100644
index 72ea1b0..0000000
Binary files a/hadoop/src/test/resources/data.csv.bz2 and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/test/resources/data.csv.gz
----------------------------------------------------------------------
diff --git a/hadoop/src/test/resources/data.csv.gz b/hadoop/src/test/resources/data.csv.gz
deleted file mode 100644
index a523280..0000000
Binary files a/hadoop/src/test/resources/data.csv.gz and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/test/resources/data.csv.lz4
----------------------------------------------------------------------
diff --git a/hadoop/src/test/resources/data.csv.lz4 b/hadoop/src/test/resources/data.csv.lz4
deleted file mode 100644
index 5288e02..0000000
Binary files a/hadoop/src/test/resources/data.csv.lz4 and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/hadoop/src/test/resources/data.csv.snappy
----------------------------------------------------------------------
diff --git a/hadoop/src/test/resources/data.csv.snappy b/hadoop/src/test/resources/data.csv.snappy
deleted file mode 100644
index ff8c8f0..0000000
Binary files a/hadoop/src/test/resources/data.csv.snappy and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 9ed0913..cb2bd3e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -41,8 +41,8 @@ import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
-import org.apache.carbondata.hadoop.csv.CSVInputFormat
-import org.apache.carbondata.hadoop.csv.recorditerator.RecordReaderIterator
+import org.apache.carbondata.processing.csvload.CSVInputFormat
+import org.apache.carbondata.processing.csvload.CSVRecordReaderIterator
import org.apache.carbondata.processing.csvreaderstep.BlockDetails
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.newflow.DataLoadExecutor
@@ -207,7 +207,7 @@ class NewCarbonDataLoadRDD[K, V](
val readers =
split.partitionBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
readers.zipWithIndex.map { case (reader, index) =>
- new RecordReaderIterator(reader,
+ new CSVRecordReaderIterator(reader,
split.partitionBlocksDetail(index),
hadoopAttemptContext)
}
@@ -233,7 +233,7 @@ class NewCarbonDataLoadRDD[K, V](
val readers =
split.nodeBlocksDetail.map(format.createRecordReader(_, hadoopAttemptContext))
readers.zipWithIndex.map { case (reader, index) =>
- new RecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
+ new CSVRecordReaderIterator(reader, split.nodeBlocksDetail(index), hadoopAttemptContext)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 2d62fe6..7592e4e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -33,7 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.processing.csvload.CSVInputFormat
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 09c049d..ef759cf 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -50,8 +50,8 @@ import org.apache.carbondata.core.service.CarbonCommonFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.core.writer.CarbonDictionaryWriter
-import org.apache.carbondata.hadoop.csv.CSVInputFormat
-import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.csvload.CSVInputFormat
+import org.apache.carbondata.processing.csvload.StringArrayWritable
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.CarbonSparkFactory
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index d7fce90..7333115 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -25,13 +25,13 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.unsafe.types.UTF8String;
-public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> {
+public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> {
@Override public void initialize(CarbonColumn[] carbonColumns,
AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index 652f56c..b848543 100644
--- a/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -21,12 +21,12 @@ import java.io.IOException;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<InternalRow> {
+public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<InternalRow> {
@Override public void initialize(CarbonColumn[] carbonColumns,
AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
new file mode 100644
index 0000000..9f80c07
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/BoundedInputStream.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.csvload;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Customarized reader class to read data from file
+ * untill the upper threshold reached.
+ */
+public class BoundedInputStream extends InputStream {
+
+ /**
+ * byte value of the new line character
+ */
+ private static final byte END_OF_LINE_BYTE_VALUE = '\n';
+
+ /**
+ * number of extra character to read
+ */
+ private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
+
+ /**
+ * number of bytes remaining
+ */
+ private long remaining;
+ /**
+ * to check whether end of line is found
+ */
+ private boolean endOfLineFound = false;
+
+ private DataInputStream in;
+
+ public BoundedInputStream(DataInputStream in, long limit) {
+ this.in = in;
+ this.remaining = limit;
+ }
+
+ /**
+ * Below method will be used to read the data from file
+ *
+ * @throws IOException
+ * problem while reading
+ */
+ @Override
+ public int read() throws IOException {
+ if (this.remaining == 0) {
+ return -1;
+ } else {
+ int var1 = this.in.read();
+ if (var1 >= 0) {
+ --this.remaining;
+ }
+
+ return var1;
+ }
+ }
+
+ /**
+ * Below method will be used to read the data from file. If limit reaches in
+ * that case it will read until new line character is reached
+ *
+ * @param buffer
+ * buffer in which data will be read
+ * @param offset
+ * from position to buffer will be filled
+ * @param length
+ * number of character to be read
+ * @throws IOException
+ * problem while reading
+ */
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ if (this.remaining == 0) {
+ return -1;
+ } else {
+ if (this.remaining < length) {
+ length = (int) this.remaining;
+ }
+
+ length = this.in.read(buffer, offset, length);
+ if (length >= 0) {
+ this.remaining -= length;
+ if (this.remaining == 0 && !endOfLineFound) {
+ endOfLineFound = true;
+ this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+ } else if (endOfLineFound) {
+ int end = offset + length;
+ for (int i = offset; i < end; i++) {
+ if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
+ this.remaining = 0;
+ return (i - offset) + 1;
+ }
+ }
+ this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+ }
+ }
+ return length;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+
+ public long getRemaining() {
+ return this.remaining;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
new file mode 100644
index 0000000..f38175d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVInputFormat.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.csvload;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import com.univocity.parsers.csv.CsvParser;
+import com.univocity.parsers.csv.CsvParserSettings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CodecPool;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.hadoop.io.compress.SplittableCompressionCodec;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.LineReader;
+
+/**
+ * An {@link org.apache.hadoop.mapreduce.InputFormat} for csv files. Files are broken into lines.
+ * Values are the line of csv files.
+ */
+public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWritable> {
+
+ public static final String DELIMITER = "carbon.csvinputformat.delimiter";
+ public static final String DELIMITER_DEFAULT = ",";
+ public static final String COMMENT = "carbon.csvinputformat.comment";
+ public static final String COMMENT_DEFAULT = "#";
+ public static final String QUOTE = "carbon.csvinputformat.quote";
+ public static final String QUOTE_DEFAULT = "\"";
+ public static final String ESCAPE = "carbon.csvinputformat.escape";
+ public static final String ESCAPE_DEFAULT = "\\";
+ public static final String HEADER_PRESENT = "caron.csvinputformat.header.present";
+ public static final boolean HEADER_PRESENT_DEFAULT = false;
+ public static final String READ_BUFFER_SIZE = "carbon.csvinputformat.read.buffer.size";
+ public static final String READ_BUFFER_SIZE_DEFAULT = "65536";
+
+ @Override
+ public RecordReader<NullWritable, StringArrayWritable> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext context) throws IOException, InterruptedException {
+ return new CSVRecordReader();
+ }
+
+ @Override
+ protected boolean isSplitable(JobContext context, Path file) {
+ final CompressionCodec codec = new CompressionCodecFactory(context.getConfiguration())
+ .getCodec(file);
+ if (null == codec) {
+ return true;
+ }
+ return codec instanceof SplittableCompressionCodec;
+ }
+
+ /**
+ * Sets the comment char to configuration. Default it is #.
+ * @param configuration
+ * @param commentChar
+ */
+ public static void setCommentCharacter(Configuration configuration, String commentChar) {
+ if (commentChar != null && !commentChar.isEmpty()) {
+ configuration.set(COMMENT, commentChar);
+ }
+ }
+
+ /**
+ * Sets the delimiter to configuration. Default it is ','
+ * @param configuration
+ * @param delimiter
+ */
+ public static void setCSVDelimiter(Configuration configuration, String delimiter) {
+ if (delimiter != null && !delimiter.isEmpty()) {
+ configuration.set(DELIMITER, delimiter);
+ }
+ }
+
+ /**
+ * Sets the escape character to configuration. Default it is \
+ * @param configuration
+ * @param escapeCharacter
+ */
+ public static void setEscapeCharacter(Configuration configuration, String escapeCharacter) {
+ if (escapeCharacter != null && !escapeCharacter.isEmpty()) {
+ configuration.set(ESCAPE, escapeCharacter);
+ }
+ }
+
+ /**
+ * Whether header needs to read from csv or not. By default it is false.
+ * @param configuration
+ * @param headerExtractEnable
+ */
+ public static void setHeaderExtractionEnabled(Configuration configuration,
+ boolean headerExtractEnable) {
+ configuration.set(HEADER_PRESENT, String.valueOf(headerExtractEnable));
+ }
+
+ /**
+ * Sets the quote character to configuration. Default it is "
+ * @param configuration
+ * @param quoteCharacter
+ */
+ public static void setQuoteCharacter(Configuration configuration, String quoteCharacter) {
+ if (quoteCharacter != null && !quoteCharacter.isEmpty()) {
+ configuration.set(QUOTE, quoteCharacter);
+ }
+ }
+
+ /**
+ * Sets the read buffer size to configuration.
+ * @param configuration
+ * @param bufferSize
+ */
+ public static void setReadBufferSize(Configuration configuration, String bufferSize) {
+ if (bufferSize != null && !bufferSize.isEmpty()) {
+ configuration.set(READ_BUFFER_SIZE, bufferSize);
+ }
+ }
+
+ /**
+ * Treats value as line in file. Key is null.
+ */
+ public static class CSVRecordReader extends RecordReader<NullWritable, StringArrayWritable> {
+
+ private long start;
+ private long end;
+ private BoundedInputStream boundedInputStream;
+ private Reader reader;
+ private CsvParser csvParser;
+ private StringArrayWritable value;
+ private String[] columns;
+ private Seekable filePosition;
+ private boolean isCompressedInput;
+ private Decompressor decompressor;
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ FileSplit split = (FileSplit) inputSplit;
+ start = split.getStart();
+ end = start + split.getLength();
+ Path file = split.getPath();
+ Configuration job = context.getConfiguration();
+ CompressionCodec codec = (new CompressionCodecFactory(job)).getCodec(file);
+ FileSystem fs = file.getFileSystem(job);
+ int bufferSize = Integer.parseInt(job.get(READ_BUFFER_SIZE, READ_BUFFER_SIZE_DEFAULT));
+ FSDataInputStream fileIn = fs.open(file, bufferSize);
+ InputStream inputStream;
+ if (codec != null) {
+ isCompressedInput = true;
+ decompressor = CodecPool.getDecompressor(codec);
+ if (codec instanceof SplittableCompressionCodec) {
+ SplitCompressionInputStream scIn = ((SplittableCompressionCodec) codec)
+ .createInputStream(fileIn, decompressor, start, end, SplittableCompressionCodec
+ .READ_MODE.BYBLOCK);
+ start = scIn.getAdjustedStart();
+ end = scIn.getAdjustedEnd();
+ if (start != 0) {
+ LineReader lineReader = new LineReader(scIn, 1);
+ start += lineReader.readLine(new Text(), 0);
+ }
+ filePosition = scIn;
+ inputStream = scIn;
+ } else {
+ CompressionInputStream cIn = codec.createInputStream(fileIn, decompressor);
+ filePosition = cIn;
+ inputStream = cIn;
+ }
+ } else {
+ fileIn.seek(start);
+ if (start != 0) {
+ LineReader lineReader = new LineReader(fileIn, 1);
+ start += lineReader.readLine(new Text(), 0);
+ }
+ boundedInputStream = new BoundedInputStream(fileIn, end - start);
+ filePosition = fileIn;
+ inputStream = boundedInputStream;
+ }
+ reader = new InputStreamReader(inputStream);
+ csvParser = new CsvParser(extractCsvParserSettings(job));
+ csvParser.beginParsing(reader);
+ }
+
+ private CsvParserSettings extractCsvParserSettings(Configuration job) {
+ CsvParserSettings parserSettings = new CsvParserSettings();
+ parserSettings.getFormat().setDelimiter(job.get(DELIMITER, DELIMITER_DEFAULT).charAt(0));
+ parserSettings.getFormat().setComment(job.get(COMMENT, COMMENT_DEFAULT).charAt(0));
+ parserSettings.setLineSeparatorDetectionEnabled(true);
+ parserSettings.setNullValue("");
+ parserSettings.setIgnoreLeadingWhitespaces(false);
+ parserSettings.setIgnoreTrailingWhitespaces(false);
+ parserSettings.setSkipEmptyLines(false);
+ // TODO get from csv file.
+ parserSettings.setMaxColumns(1000);
+ parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
+ parserSettings.getFormat().setQuoteEscape(job.get(ESCAPE, ESCAPE_DEFAULT).charAt(0));
+ if (start == 0) {
+ parserSettings.setHeaderExtractionEnabled(job.getBoolean(HEADER_PRESENT,
+ HEADER_PRESENT_DEFAULT));
+ }
+ return parserSettings;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ columns = csvParser.parseNext();
+ if (columns == null) {
+ value = null;
+ return false;
+ }
+ if (value == null) {
+ value = new StringArrayWritable();
+ }
+ value.set(columns);
+ return true;
+ }
+
+ @Override
+ public NullWritable getCurrentKey() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public StringArrayWritable getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ private long getPos() throws IOException {
+ long retVal = start;
+ if (null != boundedInputStream) {
+ retVal = end - boundedInputStream.getRemaining();
+ } else if (isCompressedInput && null != filePosition) {
+ retVal = filePosition.getPos();
+ }
+ return retVal;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return start == end ? 0.0F : Math.min(1.0F, (float) (getPos() -
+ start) / (float) (end - start));
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (reader != null) {
+ reader.close();
+ }
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
new file mode 100644
index 0000000..10a036a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/CSVRecordReaderIterator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.csvload;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * It is wrapper iterator around @{@link RecordReader}.
+ */
+public class CSVRecordReaderIterator extends CarbonIterator<Object []> {
+
+ private RecordReader<NullWritable, StringArrayWritable> recordReader;
+
+ /**
+ * It is just a little hack to make recordreader as iterator. Usually we cannot call hasNext
+ * multiple times on record reader as it moves another line. To avoid that situation like hasNext
+ * only tells whether next row is present or not and next will move the pointer to next row after
+ * consuming it.
+ */
+ private boolean isConsumed;
+
+ private InputSplit split;
+
+ private TaskAttemptContext context;
+
+ public CSVRecordReaderIterator(RecordReader<NullWritable, StringArrayWritable> recordReader,
+ InputSplit split, TaskAttemptContext context) {
+ this.recordReader = recordReader;
+ this.split = split;
+ this.context = context;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ if (!isConsumed) {
+ isConsumed = recordReader.nextKeyValue();
+ return isConsumed;
+ }
+ return true;
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ @Override
+ public Object[] next() {
+ try {
+ String[] data = recordReader.getCurrentValue().get();
+ isConsumed = false;
+ return data;
+ } catch (Exception e) {
+ throw new CarbonDataLoadingException(e);
+ }
+ }
+
+ @Override
+ public void initialize() {
+ try {
+ recordReader.initialize(split, context);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ recordReader.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
new file mode 100644
index 0000000..7eb3ec9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/StringArrayWritable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.csvload;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A String sequence that is usable as a key or value.
+ */
+public class StringArrayWritable implements Writable {
+ private String[] values;
+
+ public String[] toStrings() {
+ return values;
+ }
+
+ public void set(String[] values) {
+ this.values = values;
+ }
+
+ public String[] get() {
+ return values;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int length = in.readInt();
+ values = new String[length];
+ for (int i = 0; i < length; i++) {
+ byte[] b = new byte[in.readInt()];
+ in.readFully(b);
+ values[i] = new String(b, Charset.defaultCharset());
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(values.length); // write values
+ for (int i = 0; i < values.length; i++) {
+ byte[] b = values[i].getBytes(Charset.defaultCharset());
+ out.writeInt(b.length);
+ out.write(b);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(values);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d6ceb1d3/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
new file mode 100644
index 0000000..66aedb6
--- /dev/null
+++ b/processing/src/test/java/org/apache/carbondata/processing/csvload/CSVInputFormatTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.csvload;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.io.compress.Lz4Codec;
+import org.apache.hadoop.io.compress.SnappyCodec;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class CSVInputFormatTest extends TestCase {
+
+ /**
+ * generate compressed files, no need to call this method.
+ * @throws Exception
+ */
+ public void generateCompressFiles() throws Exception {
+ String pwd = new File("src/test/resources/csv").getCanonicalPath();
+ String inputFile = pwd + "/data.csv";
+ FileInputStream input = new FileInputStream(inputFile);
+ Configuration conf = new Configuration();
+
+ // .gz
+ String outputFile = pwd + "/data.csv.gz";
+ FileOutputStream output = new FileOutputStream(outputFile);
+ GzipCodec gzip = new GzipCodec();
+ gzip.setConf(conf);
+ CompressionOutputStream outputStream = gzip.createOutputStream(output);
+ int i = -1;
+ while ((i = input.read()) != -1) {
+ outputStream.write(i);
+ }
+ outputStream.close();
+ input.close();
+
+ // .bz2
+ input = new FileInputStream(inputFile);
+ outputFile = pwd + "/data.csv.bz2";
+ output = new FileOutputStream(outputFile);
+ BZip2Codec bzip2 = new BZip2Codec();
+ bzip2.setConf(conf);
+ outputStream = bzip2.createOutputStream(output);
+ i = -1;
+ while ((i = input.read()) != -1) {
+ outputStream.write(i);
+ }
+ outputStream.close();
+ input.close();
+
+ // .snappy
+ input = new FileInputStream(inputFile);
+ outputFile = pwd + "/data.csv.snappy";
+ output = new FileOutputStream(outputFile);
+ SnappyCodec snappy = new SnappyCodec();
+ snappy.setConf(conf);
+ outputStream = snappy.createOutputStream(output);
+ i = -1;
+ while ((i = input.read()) != -1) {
+ outputStream.write(i);
+ }
+ outputStream.close();
+ input.close();
+
+ //.lz4
+ input = new FileInputStream(inputFile);
+ outputFile = pwd + "/data.csv.lz4";
+ output = new FileOutputStream(outputFile);
+ Lz4Codec lz4 = new Lz4Codec();
+ lz4.setConf(conf);
+ outputStream = lz4.createOutputStream(output);
+ i = -1;
+ while ((i = input.read()) != -1) {
+ outputStream.write(i);
+ }
+ outputStream.close();
+ input.close();
+
+ }
+
+ /**
+ * CSVCheckMapper check the content of csv files.
+ */
+ public static class CSVCheckMapper extends Mapper<NullWritable, StringArrayWritable, NullWritable,
+ NullWritable> {
+ @Override
+ protected void map(NullWritable key, StringArrayWritable value, Context context)
+ throws IOException, InterruptedException {
+ String[] columns = value.get();
+ int id = Integer.parseInt(columns[0]);
+ int salary = Integer.parseInt(columns[6]);
+ Assert.assertEquals(id - 1, salary - 15000);
+ }
+ }
+
+ /**
+ * test read csv files
+ * @throws Exception
+ */
+ @Test public void testReadCSVFiles() throws Exception{
+ Configuration conf = new Configuration();
+ prepareConf(conf);
+ Job job = Job.getInstance(conf, "CSVInputFormat_normal");
+ job.setJarByClass(CSVInputFormatTest.class);
+ job.setMapperClass(CSVCheckMapper.class);
+ job.setNumReduceTasks(0);
+ job.setInputFormatClass(CSVInputFormat.class);
+
+ String inputFolder = new File("src/test/resources/csv").getCanonicalPath();
+ FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv"));
+ FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.bz2"));
+ FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.gz"));
+ // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.lz4"));
+ // FileInputFormat.addInputPath(job, new Path(inputFolder + File.separator + "data.csv.snappy"));
+
+ File output = new File("target/output_CSVInputFormatTest");
+ deleteOutput(output);
+ FileOutputFormat.setOutputPath(job, new Path(output.getCanonicalPath()));
+
+ Assert.assertTrue(job.waitForCompletion(true));
+ }
+
+ private void prepareConf(Configuration conf) {
+ conf.setBoolean(CSVInputFormat.HEADER_PRESENT, true);
+ }
+
+ private void deleteOutput(File output) {
+ if (output.exists()) {
+ if (output.isDirectory()) {
+ for(File file : output.listFiles()) {
+ deleteOutput(file);
+ }
+ output.delete();
+ } else {
+ output.delete();
+ }
+ }
+ }
+}