You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/11/28 10:07:51 UTC
[1/3] git commit: TAJO-332: Invalid row count of CSVScanner. (jinho)
Updated Branches:
refs/heads/DAG-execplan 7c97735e1 -> 6532324b0
TAJO-332: Invalid row count of CSVScanner. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/b8435e7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/b8435e7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/b8435e7c
Branch: refs/heads/DAG-execplan
Commit: b8435e7c2dd9999023cca6040b33427dd886c2ae
Parents: 67e0d94
Author: jinossy <ji...@gmail.com>
Authored: Wed Nov 27 12:56:55 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Wed Nov 27 12:56:55 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../main/java/org/apache/tajo/util/Bytes.java | 46 +-
.../java/org/apache/tajo/storage/CSVFile.java | 378 +++++--------
.../java/org/apache/tajo/storage/LazyTuple.java | 76 +--
.../org/apache/tajo/storage/LineReader.java | 555 +++++++++++++++++++
.../tajo/storage/TextSerializeDeserialize.java | 4 +-
.../tajo/storage/TestCompressionStorages.java | 2 +-
7 files changed, 721 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e1a824d..f60ee90 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,8 @@ Release 0.8.0 - unreleased
BUG FIXES
+ TAJO-332: Invalid row count of CSVScanner. (jinho)
+
TAJO-326: In ExecutionBlock, isRoot() and isLeafBlock() return invalid values. (jihoon)
TAJO-296: Late registration of Tajo workers. (hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
index 299ed4c..cfabf21 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
@@ -26,10 +26,7 @@ import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import sun.misc.Unsafe;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
import java.lang.reflect.Field;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -1413,19 +1410,19 @@ public class Bytes {
}
public static byte[][] splitPreserveAllTokens(byte[] str, char separatorChar, int[] target) {
- return splitWorker(str, 0, separatorChar, true, target);
+ return splitWorker(str, 0, -1, separatorChar, true, target);
}
- public static byte[][] splitPreserveAllTokens(byte[] str, int length, char separatorChar, int[] target) {
- return splitWorker(str, length, separatorChar, true, target);
+ public static byte[][] splitPreserveAllTokens(byte[] str, int offset, int length, char separatorChar, int[] target) {
+ return splitWorker(str, offset, length, separatorChar, true, target);
}
public static byte[][] splitPreserveAllTokens(byte[] str, char separatorChar) {
- return splitWorker(str, 0, separatorChar, true, null);
+ return splitWorker(str, 0, -1, separatorChar, true, null);
}
public static byte[][] splitPreserveAllTokens(byte[] str, int length, char separatorChar) {
- return splitWorker(str, length, separatorChar, true, null);
+ return splitWorker(str, 0, length, separatorChar, true, null);
}
/**
@@ -1442,20 +1439,19 @@ public class Bytes {
* separators are treated as one separator.
* @return an array of parsed Strings, <code>null</code> if null String input
*/
- private static byte[][] splitWorker(byte[] str, int length, char separatorChar, boolean preserveAllTokens, int[] target) {
+ private static byte[][] splitWorker(byte[] str, int offset, int length, char separatorChar, boolean preserveAllTokens, int[] target) {
// Performance tuned for 2.0 (JDK1.4)
if (str == null) {
return null;
}
int len = length;
- if(len < 1){
- len = str.length;
- }
-
if (len == 0) {
return new byte[1][0];
+ }else if(len < 0){
+ len = str.length - offset;
}
+
List list = new ArrayList();
int i = 0, start = 0;
boolean match = false;
@@ -1463,15 +1459,15 @@ public class Bytes {
int currentTarget = 0;
int currentIndex = 0;
while (i < len) {
- if (str[i] == separatorChar) {
+ if (str[i + offset] == separatorChar) {
if (match || preserveAllTokens) {
if (target == null) {
byte[] bytes = new byte[i - start];
- System.arraycopy(str, start, bytes, 0, bytes.length);
+ System.arraycopy(str, start + offset, bytes, 0, bytes.length);
list.add(bytes);
} else if (target.length > currentTarget && currentIndex == target[currentTarget]) {
byte[] bytes = new byte[i - start];
- System.arraycopy(str, start, bytes, 0, bytes.length);
+ System.arraycopy(str, start + offset, bytes, 0, bytes.length);
list.add(bytes);
currentTarget++;
} else {
@@ -1491,11 +1487,11 @@ public class Bytes {
if (match || (preserveAllTokens && lastMatch)) {
if (target == null) {
byte[] bytes = new byte[i - start];
- System.arraycopy(str, start, bytes, 0, bytes.length);
+ System.arraycopy(str, start + offset, bytes, 0, bytes.length);
list.add(bytes);
} else if (target.length > currentTarget && currentIndex == target[currentTarget]) {
byte[] bytes = new byte[i - start];
- System.arraycopy(str, start, bytes, 0, bytes.length);
+ System.arraycopy(str, start + offset, bytes, 0, bytes.length);
list.add(bytes); //str.substring(start, i));
currentTarget++;
} else {
@@ -1767,4 +1763,16 @@ public class Bytes {
return toString(b, 0, n);
}
+ public static int readFully(InputStream is, byte[] buffer, int offset, int length)
+ throws IOException {
+ int nread = 0;
+ while (nread < length) {
+ int nbytes = is.read(buffer, offset + nread, length - nread);
+ if (nbytes < 0) {
+ return nread > 0 ? nread : nbytes;
+ }
+ nread += nbytes;
+ }
+ return nread;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index a5228faf..e9b6cfc 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -18,8 +18,6 @@
package org.apache.tajo.storage;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -27,29 +25,26 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.CharDatum;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
import org.apache.tajo.util.Bytes;
import java.io.*;
+import java.util.ArrayList;
import java.util.Arrays;
public class CSVFile {
- public static byte[] trueBytes = "true".getBytes();
- public static byte[] falseBytes = "false".getBytes();
public static final String DELIMITER = "csvfile.delimiter";
public static final String NULL = "csvfile.null"; //read only
@@ -62,6 +57,7 @@ public class CSVFile {
public static class CSVAppender extends FileAppender {
private final TableMeta meta;
private final Schema schema;
+ private final int columnNum;
private final FileSystem fs;
private FSDataOutputStream fos;
private DataOutputStream outputStream;
@@ -73,7 +69,12 @@ public class CSVFile {
private CompressionCodec codec;
private Path compressedPath;
private byte[] nullChars;
- private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+ private int BUFFER_SIZE = 128 * 1024;
+ private int bufferedBytes = 0;
+ private long pos = 0;
+
+ private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+ private TextSerializeDeserialize serializeDeserialize = new TextSerializeDeserialize();
public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
super(conf, schema, meta, path);
@@ -81,7 +82,7 @@ public class CSVFile {
this.meta = meta;
this.schema = schema;
this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(DELIMITER, DELIMITER_DEFAULT)).charAt(0);
-
+ this.columnNum = schema.getColumnNum();
String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(NULL));
if (StringUtils.isEmpty(nullCharacters)) {
nullChars = NullDatum.get().asTextBytes();
@@ -119,177 +120,89 @@ public class CSVFile {
throw new AlreadyExistsStorageException(path);
}
fos = fs.create(path);
- outputStream = fos;
+ outputStream = new DataOutputStream(new BufferedOutputStream(fos));
}
if (enabledStats) {
this.stats = new TableStatistics(this.schema);
}
+ os.reset();
+ pos = fos.getPos();
+ bufferedBytes = 0;
super.init();
}
+
@Override
public void addTuple(Tuple tuple) throws IOException {
- Column col;
Datum datum;
+ int rowBytes = 0;
- int colNum = schema.getColumnNum();
- if (tuple instanceof LazyTuple) {
- LazyTuple lTuple = (LazyTuple)tuple;
- for (int i = 0; i < colNum; i++) {
- TajoDataTypes.DataType dataType = schema.getColumn(i).getDataType();
-
- switch (dataType.getType()) {
- case TEXT: {
- datum = tuple.get(i);
- if (datum instanceof NullDatum) {
- outputStream.write(nullChars);
- } else {
- outputStream.write(datum.asTextBytes());
- }
- break;
- }
- case CHAR: {
- datum = tuple.get(i);
- if (datum instanceof NullDatum) {
- outputStream.write(nullChars);
- } else {
- byte[] pad = new byte[dataType.getLength() - datum.size()];
- outputStream.write(datum.asTextBytes());
- outputStream.write(pad);
- }
- break;
- }
- case BOOLEAN: {
- datum = tuple.get(i);
- if (datum instanceof NullDatum) {
- //null datum is zero length byte array
- } else {
- outputStream.write(datum.asBool() ? trueBytes : falseBytes); //Compatibility with Apache Hive
- }
- break;
- }
- case NULL:
- break;
- case PROTOBUF:
- datum = tuple.get(i);
- ProtobufDatum protobufDatum = (ProtobufDatum) datum;
- protobufJsonFormat.print(protobufDatum.get(), outputStream);
- break;
- default:
- outputStream.write(lTuple.getTextBytes(i)); //better usage for insertion to table of lazy tuple
- break;
- }
-
- if(colNum - 1 > i){
- outputStream.write((byte) delimiter);
- }
+ for (int i = 0; i < columnNum; i++) {
+ datum = tuple.get(i);
+ rowBytes += serializeDeserialize.serialize(schema.getColumn(i), datum, os, nullChars);
- if (enabledStats) {
- datum = tuple.get(i);
- stats.analyzeField(i, datum);
- }
+ if(columnNum - 1 > i){
+ os.write((byte) delimiter);
+ rowBytes += 1;
}
- } else {
- for (int i = 0; i < schema.getColumnNum(); i++) {
- datum = tuple.get(i);
- if (enabledStats) {
- stats.analyzeField(i, datum);
- }
- if (datum instanceof NullDatum) {
- outputStream.write(nullChars);
- } else {
- col = schema.getColumn(i);
- switch (col.getDataType().getType()) {
- case BOOLEAN:
- outputStream.write(tuple.getBoolean(i).asBool() ? trueBytes : falseBytes); //Compatibility with Apache Hive
- break;
- case BIT:
- outputStream.write(tuple.getByte(i).asTextBytes());
- break;
- case BLOB:
- outputStream.write(Base64.encodeBase64(tuple.getBytes(i).asByteArray(), false));
- break;
- case CHAR:
- CharDatum charDatum = tuple.getChar(i);
- byte[] pad = new byte[col.getDataType().getLength() - datum.size()];
- outputStream.write(charDatum.asTextBytes());
- outputStream.write(pad);
- break;
- case TEXT:
- outputStream.write(tuple.getText(i).asTextBytes());
- break;
- case INT2:
- outputStream.write(tuple.getShort(i).asTextBytes());
- break;
- case INT4:
- outputStream.write(tuple.getInt(i).asTextBytes());
- break;
- case INT8:
- outputStream.write(tuple.getLong(i).asTextBytes());
- break;
- case FLOAT4:
- outputStream.write(tuple.getFloat(i).asTextBytes());
- break;
- case FLOAT8:
- outputStream.write(tuple.getDouble(i).asTextBytes());
- break;
- case INET4:
- outputStream.write(tuple.getIPv4(i).asTextBytes());
- break;
- case INET6:
- outputStream.write(tuple.getIPv6(i).toString().getBytes());
- break;
- case PROTOBUF:
- ProtobufDatum protobuf = (ProtobufDatum) datum;
- ProtobufJsonFormat.getInstance().print(protobuf.get(), outputStream);
- break;
- default:
- throw new UnsupportedOperationException("Cannot write such field: "
- + tuple.get(i).type());
- }
- }
- if(colNum - 1 > i){
- outputStream.write((byte) delimiter);
- }
+ if (enabledStats) {
+ stats.analyzeField(i, datum);
}
}
+ os.write(LF);
+ rowBytes += 1;
+
+ pos += rowBytes;
+ bufferedBytes += rowBytes;
+ if(bufferedBytes > BUFFER_SIZE){
+ flushBuffer();
+ }
// Statistical section
- outputStream.write('\n');
if (enabledStats) {
stats.incrementRow();
}
}
+ private void flushBuffer() throws IOException {
+ if(os.getLength() > 0) {
+ os.writeTo(outputStream);
+ os.reset();
+ bufferedBytes = 0;
+ }
+ }
@Override
public long getOffset() throws IOException {
- return fos.getPos();
+ return pos;
}
@Override
public void flush() throws IOException {
+ flushBuffer();
outputStream.flush();
}
@Override
public void close() throws IOException {
- // Statistical section
- if (enabledStats) {
- stats.setNumBytes(getOffset());
- }
try {
flush();
+ // Statistical section
+ if (enabledStats) {
+ stats.setNumBytes(getOffset());
+ }
+
if(deflateFilter != null) {
deflateFilter.finish();
deflateFilter.resetState();
deflateFilter = null;
}
- fos.close();
+ os.close();
} finally {
+ IOUtils.cleanup(LOG, fos);
if (compressor != null) {
CodecPool.returnCompressor(compressor);
compressor = null;
@@ -322,11 +235,10 @@ public class CSVFile {
factory = new CompressionCodecFactory(conf);
codec = factory.getCodec(fragment.getPath());
if (codec == null || codec instanceof SplittableCompressionCodec) {
- splittable = true;
+ splittable = true;
}
- // Buffer size, Delimiter
- this.bufSize = DEFAULT_BUFFER_SIZE;
+ //Delimiter
String delim = meta.getOption(DELIMITER, DELIMITER_DEFAULT);
this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
@@ -339,7 +251,6 @@ public class CSVFile {
}
private final static int DEFAULT_BUFFER_SIZE = 128 * 1024;
- private int bufSize;
private char delimiter;
private FileSystem fs;
private FSDataInputStream fis;
@@ -349,28 +260,28 @@ public class CSVFile {
private Decompressor decompressor;
private Seekable filePosition;
private boolean splittable = false;
- private long startOffset, length;
- private byte[] buf = null;
- private byte[][] tuples = null;
- private long[] tupleOffsets = null;
+ private long startOffset, length, end, pos;
private int currentIdx = 0, validIdx = 0;
- private byte[] tail = null;
- private long pageStart = -1;
- private long prevTailLen = -1;
private int[] targetColumnIndexes;
private boolean eof = false;
private final byte[] nullChars;
+ private LineReader reader;
+ private ArrayList<Long> fileOffsets = new ArrayList<Long>();
+ private ArrayList<Integer> rowLengthList = new ArrayList<Integer>();
+ private ArrayList<Integer> startOffsets = new ArrayList<Integer>();
+ private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
@Override
public void init() throws IOException {
// FileFragment information
- fs = fragment.getPath().getFileSystem(conf);
- fis = fs.open(fragment.getPath());
- startOffset = fragment.getStartKey();
- length = fragment.getEndKey();
+ if(fs == null) fs = fragment.getPath().getFileSystem(conf);
+ if(fis == null) fis = fs.open(fragment.getPath());
- if(startOffset > 0) startOffset--; // prev line feed
+ pos = startOffset = fragment.getStartKey();
+ length = fragment.getEndKey();
+ end = startOffset + length;
+ fis.seek(startOffset);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
@@ -385,14 +296,14 @@ public class CSVFile {
is = cIn;
} else {
is = new DataInputStream(codec.createInputStream(fis, decompressor));
+ filePosition = fis;
}
} else {
- fis.seek(startOffset);
filePosition = fis;
is = fis;
}
- tuples = new byte[0][];
+ reader = new LineReader(is, DEFAULT_BUFFER_SIZE);
if (targets == null) {
targets = schema.toArray();
}
@@ -410,103 +321,61 @@ public class CSVFile {
}
if (startOffset != 0) {
- int rbyte;
- while ((rbyte = is.read()) != LF) {
- if(rbyte == EOF) break;
- }
- }
-
- if (fragmentable() < 1) {
- close();
- return;
+ pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
}
+ eof = false;
page();
}
+ private int maxBytesToConsume(long pos) {
+ return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
+ }
+
private long fragmentable() throws IOException {
- return startOffset + length - getFilePosition();
+ return end - getFilePosition();
}
private long getFilePosition() throws IOException {
long retVal;
- if (filePosition != null) {
+ if (isCompress()) {
retVal = filePosition.getPos();
} else {
- retVal = fis.getPos();
+ retVal = pos;
}
return retVal;
}
private void page() throws IOException {
- // Index initialization
+// // Index initialization
currentIdx = 0;
+ validIdx = 0;
+ int currentBufferPos = 0;
+ int bufferedSize = 0;
- // Buffer size set
- if (isSplittable() && fragmentable() < DEFAULT_BUFFER_SIZE) {
- bufSize = (int)fragmentable();
- }
-
- if (this.tail == null || this.tail.length == 0) {
- this.pageStart = getFilePosition();
- this.prevTailLen = 0;
- } else {
- this.pageStart = getFilePosition() - this.tail.length;
- this.prevTailLen = this.tail.length;
- }
+ buffer.reset();
+ startOffsets.clear();
+ rowLengthList.clear();
+ fileOffsets.clear();
- // Read
- int rbyte;
- buf = new byte[bufSize];
- rbyte = is.read(buf);
+ if(eof) return;
- if (prevTailLen == 0) {
- if(rbyte == EOF){
- eof = true; //EOF
- return;
- }
+ while (DEFAULT_BUFFER_SIZE > bufferedSize){
- tail = new byte[0];
- tuples = Bytes.splitPreserveAllTokens(buf, rbyte, (char) LF);
- } else {
- byte[] lastRow = ArrayUtils.addAll(tail, buf);
- tuples = Bytes.splitPreserveAllTokens(lastRow, rbyte + tail.length, (char) LF);
- tail = null;
- }
-
- // Check tail
- if ((char) buf[rbyte - 1] != LF) {
- // splittable bzip2 compression returned 1 byte when sync maker found
- if (isSplittable() && (fragmentable() < 1 || rbyte != bufSize)) {
- int lineFeedPos = 0;
- byte[] temp = new byte[DEFAULT_BUFFER_SIZE];
-
- // find line feed
- while ((temp[lineFeedPos] = (byte)is.read()) != (byte)LF) {
- lineFeedPos++;
- }
-
- tuples[tuples.length - 1] = ArrayUtils.addAll(tuples[tuples.length - 1],
- ArrayUtils.subarray(temp, 0, lineFeedPos));
- validIdx = tuples.length;
+ int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ if(ret <= 0){
+ eof = true;
+ break;
} else {
- tail = tuples[tuples.length - 1];
- validIdx = tuples.length - 1;
+ fileOffsets.add(pos);
+ pos += ret;
+ startOffsets.add(currentBufferPos);
+ currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
+ bufferedSize += ret;
+ validIdx++;
}
- } else {
- tail = new byte[0];
- validIdx = tuples.length - 1; //remove last empty row ( .... \n .... \n length is 3)
- }
- if(!isCompress()) makeTupleOffset();
- }
-
- private void makeTupleOffset() {
- long curTupleOffset = 0;
- this.tupleOffsets = new long[this.validIdx];
- for (int i = 0; i < this.validIdx; i++) {
- this.tupleOffsets[i] = curTupleOffset + this.pageStart;
- curTupleOffset += this.tuples[i].length + 1;//tuple byte + 1byte line feed
+ if(isSplittable() && getFilePosition() > end) break;
}
}
@@ -514,31 +383,31 @@ public class CSVFile {
public Tuple next() throws IOException {
try {
if (currentIdx == validIdx) {
- if (isSplittable() && fragmentable() < 1) {
- close();
+ if (isSplittable() && fragmentable() <= 0) {
return null;
} else {
page();
- }
- if(eof){
- close();
- return null;
+ if(currentIdx == validIdx){
+ return null;
+ }
}
}
long offset = -1;
if(!isCompress()){
- offset = this.tupleOffsets[currentIdx];
+ offset = fileOffsets.get(currentIdx);
}
- byte[][] cells = Bytes.splitPreserveAllTokens(tuples[currentIdx++], delimiter, targetColumnIndexes);
+ byte[][] cells = Bytes.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
+ rowLengthList.get(currentIdx), delimiter, targetColumnIndexes);
+ currentIdx++;
return new LazyTuple(schema, cells, offset, nullChars);
} catch (Throwable t) {
- LOG.error("Tuple list length: " + (tuples != null ? tuples.length : 0), t);
+ LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
LOG.error("Tuple list current index: " + currentIdx, t);
+ throw new IOException(t);
}
- return null;
}
private boolean isCompress() {
@@ -547,13 +416,20 @@ public class CSVFile {
@Override
public void reset() throws IOException {
+ if (decompressor != null) {
+ decompressor.reset();
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+
init();
}
@Override
public void close() throws IOException {
try {
- is.close();
+ IOUtils.cleanup(LOG, is, fis);
+ fs = null;
} finally {
if (decompressor != null) {
decompressor.reset();
@@ -581,26 +457,25 @@ public class CSVFile {
public void seek(long offset) throws IOException {
if(isCompress()) throw new UnsupportedException();
- int tupleIndex = Arrays.binarySearch(this.tupleOffsets, offset);
+ int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
+
if (tupleIndex > -1) {
this.currentIdx = tupleIndex;
- } else if (isSplittable() && offset >= this.pageStart + this.bufSize
- + this.prevTailLen - this.tail.length || offset <= this.pageStart) {
- filePosition.seek(offset);
- tail = new byte[0];
- buf = new byte[DEFAULT_BUFFER_SIZE];
- bufSize = DEFAULT_BUFFER_SIZE;
+ } else if (isSplittable() && end >= offset || startOffset <= offset) {
+ eof = false;
+ fis.seek(offset);
+ pos = offset;
+ reader.reset();
this.currentIdx = 0;
this.validIdx = 0;
// pageBuffer();
} else {
throw new IOException("invalid offset " +
- " < pageStart : " + this.pageStart + " , " +
- " pagelength : " + this.bufSize + " , " +
- " tail lenght : " + this.tail.length +
+ " < start : " + startOffset + " , " +
+ " end : " + end + " , " +
+ " filePos : " + filePosition.getPos() + " , " +
" input offset : " + offset + " >");
}
-
}
@Override
@@ -608,13 +483,14 @@ public class CSVFile {
if(isCompress()) throw new UnsupportedException();
if (this.currentIdx == this.validIdx) {
- if (fragmentable() < 1) {
+ if (fragmentable() <= 0) {
return -1;
} else {
page();
+ if(currentIdx == validIdx) return -1;
}
}
- return this.tupleOffsets[currentIdx];
+ return fileOffsets.get(currentIdx);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index a395ea4..50bc65c 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -18,14 +18,9 @@
package org.apache.tajo.storage;
-import com.google.protobuf.Message;
-import org.apache.commons.codec.binary.Base64;
import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.*;
import org.apache.tajo.datum.exception.InvalidCastException;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.util.Bytes;
import java.io.IOException;
import java.net.InetAddress;
@@ -37,6 +32,7 @@ public class LazyTuple implements Tuple {
private byte[][] textBytes;
private Schema schema;
private byte[] nullBytes;
+ private static TextSerializeDeserialize serializeDeserialize = new TextSerializeDeserialize();
public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
this(schema, textBytes, offset, NullDatum.get().asTextBytes());
@@ -123,7 +119,12 @@ public class LazyTuple implements Tuple {
else if (textBytes.length <= fieldId) {
values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,")
} else if (textBytes[fieldId] != null) {
- values[fieldId] = createByTextBytes(schema.getColumn(fieldId).getDataType(), textBytes[fieldId]);
+ try {
+ values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
+ textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
+ } catch (IOException e) {
+ values[fieldId] = NullDatum.get();
+ }
textBytes[fieldId] = null;
} else {
//non-projection
@@ -285,67 +286,4 @@ public class LazyTuple implements Tuple {
}
return false;
}
-
-
- public boolean isNull(byte[] val){
- return val == null || val.length == 0 || ((val.length == nullBytes.length) && Bytes.equals(val, nullBytes));
- }
-
- public boolean isNullText(byte[] val){
- return val == null || (val.length > 0 && val.length == nullBytes.length && Bytes.equals(val, nullBytes));
- }
-
- public boolean isNotNull(byte[] val){
- return !isNull(val);
- }
-
- public boolean isNotNullText(byte[] val){
- return !isNullText((val));
- }
-
- private Datum createByTextBytes(TajoDataTypes.DataType type, byte [] val) {
- switch (type.getType()) {
- case BOOLEAN:
- return isNotNull(val) ? DatumFactory.createBool(val[0] == 't' || val[0] == 'T') : NullDatum.get();
- case INT2:
- return isNotNull(val) ? DatumFactory.createInt2(new String(val)) : NullDatum.get();
- case INT4:
- return isNotNull(val) ? DatumFactory.createInt4(new String(val)) : NullDatum.get();
- case INT8:
- return isNotNull(val) ? DatumFactory.createInt8(new String(val)) : NullDatum.get();
- case FLOAT4:
- return isNotNull(val) ? DatumFactory.createFloat4(new String(val)) : NullDatum.get();
- case FLOAT8:
- return isNotNull(val) ? DatumFactory.createFloat8(new String(val)) : NullDatum.get();
- case CHAR:
- return isNotNullText(val) ? DatumFactory.createChar(new String(val).trim()) : NullDatum.get();
- case TEXT:
- return isNotNullText(val) ? DatumFactory.createText(val) : NullDatum.get();
- case BIT:
- return DatumFactory.createBit(Byte.parseByte(new String(val)));
- case BLOB:
- return DatumFactory.createBlob(Base64.decodeBase64(val));
- case INET4:
- return isNotNull(val) ? DatumFactory.createInet4(new String(val)) : NullDatum.get();
- case PROTOBUF: {
- if (isNotNull(val)) {
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(type);
- Message.Builder builder = factory.newBuilder();
- try {
- ProtobufJsonFormat.getInstance().merge(val, builder);
- return factory.createDatum(builder.build());
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
- }
- } else {
- return NullDatum.get();
- }
- }
- case NULL:
- return NullDatum.get();
- default:
- throw new UnsupportedOperationException(type.toString());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
new file mode 100644
index 0000000..f48c482
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+/**
+ * A class that provides a line reader from an input stream.
+ * Depending on the constructor used, lines will either be terminated by:
+ * <ul>
+ * <li>one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF).</li>
+ * <li><em>or</em>, a custom byte sequence delimiter</li>
+ * </ul>
+ * In both cases, EOF also terminates an otherwise unterminated
+ * line.
+ */
+
+public class LineReader implements Closeable {
+ private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private int bufferSize = DEFAULT_BUFFER_SIZE;
+ private InputStream in;
+ private byte[] buffer;
+ // the number of bytes of real data in the buffer
+ private int bufferLength = 0;
+ // the current position in the buffer
+ private int bufferPosn = 0;
+
+ private static final byte CR = '\r';
+ private static final byte LF = '\n';
+
+ // The line delimiter
+ private final byte[] recordDelimiterBytes;
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * default buffer-size (64k).
+ *
+ * @param in The input stream
+ * @throws IOException
+ */
+ public LineReader(InputStream in) {
+ this(in, DEFAULT_BUFFER_SIZE);
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size.
+ *
+ * @param in The input stream
+ * @param bufferSize Size of the read buffer
+ * @throws IOException
+ */
+ public LineReader(InputStream in, int bufferSize) {
+ this.in = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = null;
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * <code>io.file.buffer.size</code> specified in the given
+ * <code>Configuration</code>.
+ *
+ * @param in input stream
+ * @param conf configuration
+ * @throws IOException
+ */
+ public LineReader(InputStream in, Configuration conf) throws IOException {
+ this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * default buffer-size, and using a custom delimiter of array of
+ * bytes.
+ *
+ * @param in The input stream
+ * @param recordDelimiterBytes The delimiter
+ */
+ public LineReader(InputStream in, byte[] recordDelimiterBytes) {
+ this.in = in;
+ this.bufferSize = DEFAULT_BUFFER_SIZE;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * given buffer-size, and using a custom delimiter of array of
+ * bytes.
+ *
+ * @param in The input stream
+ * @param bufferSize Size of the read buffer
+ * @param recordDelimiterBytes The delimiter
+ * @throws IOException
+ */
+ public LineReader(InputStream in, int bufferSize,
+ byte[] recordDelimiterBytes) {
+ this.in = in;
+ this.bufferSize = bufferSize;
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
+
+ /**
+ * Create a line reader that reads from the given stream using the
+ * <code>io.file.buffer.size</code> specified in the given
+ * <code>Configuration</code>, and using a custom delimiter of array of
+ * bytes.
+ *
+ * @param in input stream
+ * @param conf configuration
+ * @param recordDelimiterBytes The delimiter
+ * @throws IOException
+ */
+ public LineReader(InputStream in, Configuration conf,
+ byte[] recordDelimiterBytes) throws IOException {
+ this.in = in;
+ this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+ this.buffer = new byte[this.bufferSize];
+ this.recordDelimiterBytes = recordDelimiterBytes;
+ }
+
+
+ /**
+ * Close the underlying stream.
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ in.close();
+ }
+
+ public void reset() {
+ bufferLength = 0;
+ bufferPosn = 0;
+
+ }
+
+ /**
+ * Read one line from the InputStream into the given Text.
+ *
+ * @param str the object to store the given line (without newline)
+ * @param maxLineLength the maximum number of bytes to store into str;
+ * the rest of the line is silently discarded.
+ * @param maxBytesToConsume the maximum number of bytes to consume
+ * in this call. This is only a hint, because if the line cross
+ * this threshold, we allow it to happen. It can overshoot
+ * potentially by as much as one buffer length.
+ * @return the number of bytes read including the (longest) newline
+ * found.
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength,
+ int maxBytesToConsume) throws IOException {
+ if (this.recordDelimiterBytes != null) {
+ return readCustomLine(str, maxLineLength, maxBytesToConsume);
+ } else {
+ return readDefaultLine(str, maxLineLength, maxBytesToConsume);
+ }
+ }
+
+ /**
+ * Read a line terminated by one of CR, LF, or CRLF.
+ */
+ private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
+ str.clear();
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
+ long bytesConsumed = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ if (prevCharCR) {
+ ++bytesConsumed; //account for CR from previous read
+ }
+ bufferLength = in.read(buffer);
+ if (bufferLength <= 0) {
+ break; // EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (buffer[bufferPosn] == CR);
+ }
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength; //CR at the end of the buffer
+ }
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+ if (appendLength > 0) {
+ str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+ } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+ if (bytesConsumed > (long) Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ }
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * Read a line terminated by one of CR, LF, or CRLF.
+ */
+ public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
+ , int maxBytesToConsume)
+ throws IOException {
+ /* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ */
+
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
+ long bytesConsumed = 0;
+ do {
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ if (prevCharCR) {
+ ++bytesConsumed; //account for CR from previous read
+ }
+ bufferLength = in.read(buffer);
+ if (bufferLength <= 0) {
+ break; // EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (buffer[bufferPosn] == CR);
+ }
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength; //CR at the end of the buffer
+ }
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+ if (appendLength > 0) {
+ str.write(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+ } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+ if (bytesConsumed > (long) Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ }
+
+ if (bytesConsumed > 0) offsets.add(txtLength);
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * Read a line terminated by one of CR, LF, or CRLF.
+ */
+
+/* int validIdx = 0;
+ public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets,
+ long pos, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ *//* We're reading data from in, but the head of the stream may be
+ * already buffered in buffer, so we have several cases:
+ * 1. No newline characters are in the buffer, so we need to copy
+ * everything and read another buffer from the stream.
+ * 2. An unambiguously terminated line is in buffer, so we just
+ * copy to str.
+ * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+ * in CR. In this case we copy everything up to CR to str, but
+ * we also need to see what follows CR: if it's LF, then we
+ * need consume LF as well, so next call to readLine will read
+ * from after that.
+ * We use a flag prevCharCR to signal if previous character was CR
+ * and, if it happens to be at the end of the buffer, delay
+ * consuming it until we have a chance to look at the char that
+ * follows.
+ *//*
+ //str.clear();
+ str.reset();
+ offsets.clear();
+ foffsets.clear();
+
+ validIdx = 0;
+ long bufferBytesConsumed = 0;
+
+ int txtLength = 0; //tracks str.getLength(), as an optimization
+ int newlineLength = 0; //length of terminating newline
+ boolean prevCharCR = false; //true of prev char was CR
+ long bytesConsumed = 0;
+ do {
+
+ int startPosn = bufferPosn; //starting from where we left off the last time
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ if (prevCharCR) {
+ ++bytesConsumed; //account for CR from previous read
+ }
+ bufferLength = in.read(buffer);
+ if (bufferLength <= 0) {
+ break; // EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+ if (buffer[bufferPosn] == LF) {
+ newlineLength = (prevCharCR) ? 2 : 1;
+ ++bufferPosn; // at next invocation proceed from following byte
+ break;
+ }
+ if (prevCharCR) { //CR + notLF, we are at notLF
+ newlineLength = 1;
+ break;
+ }
+ prevCharCR = (buffer[bufferPosn] == CR);
+ }
+ int readLength = bufferPosn - startPosn;
+ if (prevCharCR && newlineLength == 0) {
+ --readLength; //CR at the end of the buffer
+ }
+ bytesConsumed += readLength;
+ int appendLength = readLength - newlineLength;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+
+ if (appendLength > 0) {
+ str.write(buffer, startPosn, appendLength);
+ //System.out.println(startPosn + "," + appendLength);
+ //str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+
+ if(newlineLength > 0){
+ validIdx++;
+
+ if (bytesConsumed > (long)Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes before newline: " + bytesConsumed);
+ }
+ offsets.add(txtLength);
+ foffsets.add(pos);
+ pos+= bytesConsumed;
+ bufferBytesConsumed += bytesConsumed;
+
+ txtLength = 0;
+ newlineLength = 0;
+ prevCharCR = false; //true of prev char was CR
+ bytesConsumed = 0;
+ } else {
+ bufferBytesConsumed += bytesConsumed;
+ bytesConsumed = 0;
+ }
+ } while ((bufferBytesConsumed < 256 * 1024));
+
+ return (int)bufferBytesConsumed;
+ }*/
+
+ /**
+ * Read a line terminated by a custom delimiter.
+ */
+ private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
+ throws IOException {
+ /* We're reading data from inputStream, but the head of the stream may be
+ * already captured in the previous buffer, so we have several cases:
+ *
+ * 1. The buffer tail does not contain any character sequence which
+ * matches with the head of delimiter. We count it as a
+ * ambiguous byte count = 0
+ *
+ * 2. The buffer tail contains a X number of characters,
+ * that forms a sequence, which matches with the
+ * head of delimiter. We count ambiguous byte count = X
+ *
+ * // *** eg: A segment of input file is as follows
+ *
+ * " record 1792: I found this bug very interesting and
+ * I have completely read about it. record 1793: This bug
+ * can be solved easily record 1794: This ."
+ *
+ * delimiter = "record";
+ *
+ * supposing:- String at the end of buffer =
+ * "I found this bug very interesting and I have completely re"
+ * There for next buffer = "ad about it. record 179 ...."
+ *
+ * The matching characters in the input
+ * buffer tail and delimiter head = "re"
+ * Therefore, ambiguous byte count = 2 **** //
+ *
+ * 2.1 If the following bytes are the remaining characters of
+ * the delimiter, then we have to capture only up to the starting
+ * position of delimiter. That means, we need not include the
+ * ambiguous characters in str.
+ *
+ * 2.2 If the following bytes are not the remaining characters of
+ * the delimiter ( as mentioned in the example ),
+ * then we have to include the ambiguous characters in str.
+ */
+ str.clear();
+ int txtLength = 0; // tracks str.getLength(), as an optimization
+ long bytesConsumed = 0;
+ int delPosn = 0;
+ int ambiguousByteCount = 0; // To capture the ambiguous characters count
+ do {
+ int startPosn = bufferPosn; // Start from previous end position
+ if (bufferPosn >= bufferLength) {
+ startPosn = bufferPosn = 0;
+ bufferLength = in.read(buffer);
+ if (bufferLength <= 0) {
+ str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+ break; // EOF
+ }
+ }
+ for (; bufferPosn < bufferLength; ++bufferPosn) {
+ if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
+ delPosn++;
+ if (delPosn >= recordDelimiterBytes.length) {
+ bufferPosn++;
+ break;
+ }
+ } else if (delPosn != 0) {
+ bufferPosn--;
+ delPosn = 0;
+ }
+ }
+ int readLength = bufferPosn - startPosn;
+ bytesConsumed += readLength;
+ int appendLength = readLength - delPosn;
+ if (appendLength > maxLineLength - txtLength) {
+ appendLength = maxLineLength - txtLength;
+ }
+ if (appendLength > 0) {
+ if (ambiguousByteCount > 0) {
+ str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+ //appending the ambiguous characters (refer case 2.2)
+ bytesConsumed += ambiguousByteCount;
+ ambiguousByteCount = 0;
+ }
+ str.append(buffer, startPosn, appendLength);
+ txtLength += appendLength;
+ }
+ if (bufferPosn >= bufferLength) {
+ if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
+ ambiguousByteCount = delPosn;
+ bytesConsumed -= ambiguousByteCount; //to be consumed in next
+ }
+ }
+ } while (delPosn < recordDelimiterBytes.length
+ && bytesConsumed < maxBytesToConsume);
+ if (bytesConsumed > (long) Integer.MAX_VALUE) {
+ throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
+ }
+ return (int) bytesConsumed;
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ *
+ * @param str the object to store the given line
+ * @param maxLineLength the maximum number of bytes to store into str.
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str, int maxLineLength) throws IOException {
+ return readLine(str, maxLineLength, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Read from the InputStream into the given Text.
+ *
+ * @param str the object to store the given line
+ * @return the number of bytes read including the newline
+ * @throws IOException if the underlying stream throws
+ */
+ public int readLine(Text str) throws IOException {
+ return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
index f7e92a2..64b078c 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
@@ -37,8 +37,7 @@ public class TextSerializeDeserialize implements SerializeDeserialize {
@Override
- public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
- throws IOException {
+ public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
byte[] bytes;
int length = 0;
@@ -178,6 +177,7 @@ public class TextSerializeDeserialize implements SerializeDeserialize {
}
default:
datum = NullDatum.get();
+ break;
}
return datum;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 3c47c10..08a181a 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -228,6 +228,6 @@ public class TestCompressionStorages {
tupleCnt++;
}
scanner.close();
- assertEquals(tupleCnt, tupleNum);
+ assertEquals(tupleNum, tupleCnt);
}
}
[3/3] git commit: Merge branch 'master' of
http://git-wip-us.apache.org/repos/asf/incubator-tajo into DAG-execplan
Posted by ji...@apache.org.
Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/incubator-tajo into DAG-execplan
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/6532324b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/6532324b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/6532324b
Branch: refs/heads/DAG-execplan
Commit: 6532324b046f15a0a120b53c1ccd6369b97069d9
Parents: 7c97735 1f80cd2
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Nov 28 18:07:06 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Nov 28 18:07:06 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 4 +
.../main/java/org/apache/tajo/util/Bytes.java | 46 +-
.../tajo/engine/function/string/Length.java | 51 ++
.../java/org/apache/tajo/master/TajoMaster.java | 5 +
.../TestStringOperatorsAndFunctions.java | 12 +
.../java/org/apache/tajo/storage/CSVFile.java | 378 +++++--------
.../java/org/apache/tajo/storage/LazyTuple.java | 76 +--
.../org/apache/tajo/storage/LineReader.java | 555 +++++++++++++++++++
.../tajo/storage/TextSerializeDeserialize.java | 4 +-
.../tajo/storage/TestCompressionStorages.java | 2 +-
10 files changed, 791 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
[2/3] git commit: TAJO-308: Implement length(string) function.
(hyoungjunkim via hyunsik)
Posted by ji...@apache.org.
TAJO-308: Implement length(string) function. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/1f80cd24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/1f80cd24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/1f80cd24
Branch: refs/heads/DAG-execplan
Commit: 1f80cd24b7d1005aa6874143c827bde72d55da84
Parents: b8435e7
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Nov 28 14:01:00 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Nov 28 14:01:00 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../tajo/engine/function/string/Length.java | 51 ++++++++++++++++++++
.../java/org/apache/tajo/master/TajoMaster.java | 5 ++
.../TestStringOperatorsAndFunctions.java | 12 +++++
4 files changed, 70 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1f80cd24/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f60ee90..0a7dd47 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.8.0 - unreleased
NEW FEATURES
+ TAJO-308: Implement length(string) function. (hyoungjunkim via hyunsik)
+
TAJO-200: RCFile compatible to apache hive. (jinho)
TAJO-176: Implement Tajo JDBC Driver. (Keuntae Park via jihoon)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1f80cd24/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/string/Length.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/string/Length.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/string/Length.java
new file mode 100644
index 0000000..5c8b689
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/string/Length.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.string;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.function.GeneralFunction;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Function definition
+ *
+ * INT4 length(string text)
+ */
+public class Length extends GeneralFunction {
+
+ public Length() {
+ super(new Column[] {
+ new Column("text", TajoDataTypes.Type.TEXT)
+ });
+ }
+
+ @Override
+ public Datum eval(Tuple params) {
+ Datum datum = params.get(0);
+ if(datum instanceof NullDatum) {
+ return NullDatum.get();
+ }
+
+ return DatumFactory.createInt4(datum.asChars().length());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1f80cd24/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index f4118bb..b634f25 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -394,6 +394,11 @@ public class TajoMaster extends CompositeService {
CatalogUtil.newSimpleDataType(Type.TEXT),
CatalogUtil.newSimpleDataTypeArray(Type.TEXT, Type.TEXT, Type.TEXT)));
+ sqlFuncs.add(
+ new FunctionDesc("length", Length.class, FunctionType.GENERAL,
+ CatalogUtil.newSimpleDataType(Type.INT4),
+ CatalogUtil.newSimpleDataTypeArray(Type.TEXT)));
+
return sqlFuncs;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1f80cd24/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
index 8d259ab..5124173 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
@@ -178,4 +178,16 @@ public class TestStringOperatorsAndFunctions extends ExprTestBase {
testEval(schema, "table1", "ABC,DEF,3.14", "select character_length(lower(col1) || lower(col2)) from table1",
new String[]{"6"});
}
+
+ @Test
+ public void testLength() throws IOException {
+ testSimpleEval("select length('123456') as col1 ", new String[]{"6"});
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", TEXT);
+ schema.addColumn("col2", TEXT);
+ schema.addColumn("col3", TEXT);
+ testEval(schema, "table1", "ABC,DEF,3.14", "select length(lower(col1) || lower(col2)) from table1",
+ new String[]{"6"});
+ }
}