You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/10 15:08:31 UTC
[1/5] tajo git commit: TAJO-1236: Remove slow 'new String' operation
in parquet format. (jinho)
Repository: tajo
Updated Branches:
refs/heads/index_support 761e372ba -> 28151a965
TAJO-1236: Remove slow 'new String' operation in parquet format. (jinho)
Closes #292
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/661c7e21
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/661c7e21
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/661c7e21
Branch: refs/heads/index_support
Commit: 661c7e216d664c71d2c889082703ecc6ae028dc3
Parents: 8f68b4b
Author: jhkim <jh...@apache.org>
Authored: Tue Dec 9 14:42:02 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Tue Dec 9 14:42:02 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 ++
.../storage/parquet/TajoRecordConverter.java | 34 ++++++++------------
.../tajo/storage/parquet/TajoWriteSupport.java | 23 +++++++------
3 files changed, 28 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/661c7e21/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 59a8f68..89d39bc 100644
--- a/CHANGES
+++ b/CHANGES
@@ -18,6 +18,9 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1236: Remove slow 'new String' operation in parquet format.
+ (jinho)
+
TAJO-1230: Disable ipv6 support on JVM. (Jihun Kang via hyunsik)
TAJO-1213: Implement CatalogStore::updateTableStats. (jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/661c7e21/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 7c3d79d..a091eac 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -18,29 +18,23 @@
package org.apache.tajo.storage.parquet;
-import com.google.protobuf.Message;
import com.google.protobuf.InvalidProtocolBufferException;
-
-import java.nio.ByteBuffer;
-
-import parquet.io.api.GroupConverter;
-import parquet.io.api.Converter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.io.api.Binary;
-import parquet.schema.Type;
-import parquet.schema.GroupType;
-
+import com.google.protobuf.Message;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.*;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.BlobDatum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatumFactory;
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.GroupType;
+import parquet.schema.Type;
+
+import java.nio.ByteBuffer;
/**
* Converter to convert a Parquet record into a Tajo Tuple.
@@ -222,7 +216,7 @@ public class TajoRecordConverter extends GroupConverter {
@Override
final public void addBinary(Binary value) {
- parent.add(DatumFactory.createChar(value.toStringUsingUTF8()));
+ parent.add(DatumFactory.createChar(value.getBytes()));
}
}
@@ -343,7 +337,7 @@ public class TajoRecordConverter extends GroupConverter {
@Override
final public void addBinary(Binary value) {
- parent.add(DatumFactory.createText(value.toStringUsingUTF8()));
+ parent.add(DatumFactory.createText(value.getBytes()));
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/661c7e21/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
index 35165de..8651131 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
@@ -18,10 +18,12 @@
package org.apache.tajo.storage.parquet;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
import parquet.hadoop.api.WriteSupport;
import parquet.io.api.Binary;
import parquet.io.api.RecordConsumer;
@@ -29,12 +31,9 @@ import parquet.schema.GroupType;
import parquet.schema.MessageType;
import parquet.schema.Type;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.datum.Datum;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
/**
* Tajo implementation of {@link WriteSupport} for {@link Tuple}s.
@@ -116,7 +115,7 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
private void writeValue(Type fieldType, Column column, Datum datum) {
switch (column.getDataType().getType()) {
case BOOLEAN:
- recordConsumer.addBoolean((Boolean) datum.asBool());
+ recordConsumer.addBoolean(datum.asBool());
break;
case BIT:
case INT2:
@@ -134,7 +133,7 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
break;
case CHAR:
case TEXT:
- recordConsumer.addBinary(Binary.fromString(datum.asChars()));
+ recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes()));
break;
case PROTOBUF:
case BLOB:
[3/5] tajo git commit: TAJO-1237: Fix missing maven-module for
pullserver. (jinho)
Posted by ji...@apache.org.
TAJO-1237: Fix missing maven-module for pullserver. (jinho)
Closes #293
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/72256fcf
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/72256fcf
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/72256fcf
Branch: refs/heads/index_support
Commit: 72256fcff2c77222e278cb690efb8f669e926a29
Parents: ac6799f
Author: jhkim <jh...@apache.org>
Authored: Wed Dec 10 11:20:47 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Wed Dec 10 11:20:47 2014 +0900
----------------------------------------------------------------------
BUILDING | 2 +-
CHANGES | 2 ++
tajo-core/pom.xml | 2 +-
tajo-dist/pom.xml | 2 +-
tajo-project/pom.xml | 2 +-
tajo-pullserver/pom.xml | 4 ++--
6 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/BUILDING
----------------------------------------------------------------------
diff --git a/BUILDING b/BUILDING
index 974fac8..8606d80 100644
--- a/BUILDING
+++ b/BUILDING
@@ -20,7 +20,7 @@ Maven main modules:
- tajo-rpc
- tajo-catalog
- tajo-storage
- - tajo-yarn-pullserver
+ - tajo-pullserver
- tajo-core
- tajo-client
- tajo-jdbc
http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 89d39bc..b2985ba 100644
--- a/CHANGES
+++ b/CHANGES
@@ -90,6 +90,8 @@ Release 0.9.1 - unreleased
BUG FIXES
+ TAJO-1237: Fix missing maven-module for pullserver. (jinho)
+
TAJO-1196: Unit test hangs occasionally and randomly. (jihoon)
TAJO-1234: Rearrange timezone in date/time types. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 060ac1b..04706b6 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -250,7 +250,7 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
- <artifactId>tajo-yarn-pullserver</artifactId>
+ <artifactId>tajo-pullserver</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index d350889..40b6c56 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -130,7 +130,7 @@
run cp -r $ROOT/tajo-client/target/tajo-client-${project.version}/* .
run cp -r $ROOT/tajo-catalog/target/tajo-catalog-${project.version}/* .
run cp -r $ROOT/tajo-storage/target/tajo-storage-${project.version}/* .
- run cp -r $ROOT/tajo-yarn-pullserver/target/tajo-yarn-pullserver-${project.version}.jar .
+ run cp -r $ROOT/tajo-pullserver/target/tajo-pullserver-${project.version}.jar .
run cp -r $ROOT/tajo-core/target/tajo-core-${project.version}.jar .
run cp -r $ROOT/tajo-core/target/lib .
run cp -r ${project.basedir}/src/main/bin .
http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index eaa54b4..63b0aec 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -746,7 +746,7 @@
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
- <artifactId>tajo-yarn-pullserver</artifactId>
+ <artifactId>tajo-pullserver</artifactId>
<version>${tajo.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/tajo-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml
index a7644a1..b9c6624 100644
--- a/tajo-pullserver/pom.xml
+++ b/tajo-pullserver/pom.xml
@@ -25,8 +25,8 @@
<relativePath>../tajo-project</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <name>Tajo Core PullServer</name>
- <artifactId>tajo-yarn-pullserver</artifactId>
+ <name>Tajo PullServer</name>
+ <artifactId>tajo-pullserver</artifactId>
<build>
<plugins>
[2/5] tajo git commit: Updating outdated affiliations.
Posted by ji...@apache.org.
Updating outdated affiliations.
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ac6799f0
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ac6799f0
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ac6799f0
Branch: refs/heads/index_support
Commit: ac6799f0ebfdd56b484611ea28dada8880406d19
Parents: 661c7e2
Author: Jakob Homan <jg...@gmail.com>
Authored: Tue Dec 9 16:32:03 2014 -0800
Committer: Jakob Homan <jg...@gmail.com>
Committed: Tue Dec 9 16:32:03 2014 -0800
----------------------------------------------------------------------
tajo-project/pom.xml | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/ac6799f0/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 14a81c3..eaa54b4 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -87,7 +87,7 @@
<id>ereisman</id>
<name>Eli Reisman</name>
<email>ereisman@apache.org</email>
- <organization>Hortonworks</organization>
+ <organization>Etsy</organization>
<roles>
<role>PMC</role>
</roles>
@@ -136,7 +136,7 @@
<id>jghoman</id>
<name>Jakob Homan</name>
<email>jghoman@apache.org</email>
- <organization>LinkedIn</organization>
+ <organization>Microsoft</organization>
<roles>
<role>PMC</role>
</roles>
@@ -166,7 +166,7 @@
<id>mzhou</id>
<name>Min Zhou</name>
<email>mzhou@apache.org</email>
- <organization>LinkedIn</organization>
+ <organization>Tango Me</organization>
<roles>
<role>Committer</role>
</roles>
@@ -196,7 +196,7 @@
<id>rsumbaly</id>
<name>Roshan Sumbaly</name>
<email>rsumbaly@apache.org</email>
- <organization>LinkedIn</organization>
+ <organization>Coursera</organization>
<roles>
<role>PMC</role>
</roles>
@@ -236,7 +236,7 @@
</contributor>
<contributor>
<name>David Chen</name>
- <organization>LinkedIn</organization>
+ <organization>Google</organization>
<roles><role>Contributor</role></roles>
</contributor>
<contributor>
[4/5] tajo git commit: TAJO-1235: ByteBufLineReader can not read text
line with CRLF. (jinho)
Posted by ji...@apache.org.
TAJO-1235: ByteBufLineReader can not read text line with CRLF. (jinho)
Closes #289
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bebc7801
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bebc7801
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bebc7801
Branch: refs/heads/index_support
Commit: bebc78011798cd2fd691e8901141e94f1d445d6e
Parents: 72256fc
Author: jhkim <jh...@apache.org>
Authored: Wed Dec 10 11:25:03 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Wed Dec 10 11:25:03 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 +
.../tajo/storage/ByteBufInputChannel.java | 4 --
.../tajo/storage/text/ByteBufLineReader.java | 68 ++++++++++++--------
.../tajo/storage/text/DelimitedLineReader.java | 7 +-
.../org/apache/tajo/storage/TestLineReader.java | 43 ++++++++++---
5 files changed, 81 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b2985ba..710f463 100644
--- a/CHANGES
+++ b/CHANGES
@@ -90,6 +90,9 @@ Release 0.9.1 - unreleased
BUG FIXES
+ TAJO-1235: ByteBufLineReader can not read text line with CRLF.
+ (jinho)
+
TAJO-1237: Fix missing maven-module for pullserver. (jinho)
TAJO-1196: Unit test hangs occasionally and randomly. (jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
index b1b6d65..45fb1d8 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
@@ -69,8 +69,4 @@ public class ByteBufInputChannel extends AbstractInterruptibleChannel implements
protected void implCloseChannel() throws IOException {
IOUtils.cleanup(null, channel, inputStream);
}
-
- public int available() throws IOException {
- return inputStream.available();
- }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
index 86319e1..2f742c6 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@ -32,10 +32,11 @@ public class ByteBufLineReader implements Closeable {
private int bufferSize;
private long readBytes;
+ private int startIndex;
private boolean eof = false;
private ByteBuf buffer;
private final ByteBufInputChannel channel;
- private final AtomicInteger tempReadBytes = new AtomicInteger();
+ private final AtomicInteger lineReadBytes = new AtomicInteger();
private final LineSplitProcessor processor = new LineSplitProcessor();
public ByteBufLineReader(ByteBufInputChannel channel) {
@@ -53,10 +54,6 @@ public class ByteBufLineReader implements Closeable {
return readBytes - buffer.readableBytes();
}
- public long available() throws IOException {
- return channel.available() + buffer.readableBytes();
- }
-
@Override
public void close() throws IOException {
if (this.buffer.refCnt() > 0) {
@@ -66,7 +63,7 @@ public class ByteBufLineReader implements Closeable {
}
public String readLine() throws IOException {
- ByteBuf buf = readLineBuf(tempReadBytes);
+ ByteBuf buf = readLineBuf(lineReadBytes);
if (buf != null) {
return buf.toString(CharsetUtil.UTF_8);
}
@@ -77,24 +74,26 @@ public class ByteBufLineReader implements Closeable {
int tailBytes = 0;
if (this.readBytes > 0) {
+ //startIndex = 0, readIndex = tailBytes length, writable = (buffer capacity - tailBytes)
this.buffer.markReaderIndex();
- this.buffer.discardSomeReadBytes(); // compact the buffer
+ this.buffer.discardReadBytes(); // compact the buffer
tailBytes = this.buffer.writerIndex();
if (!this.buffer.isWritable()) {
// a line bytes is large than the buffer
- BufferPool.ensureWritable(buffer, bufferSize);
+ BufferPool.ensureWritable(buffer, bufferSize * 2);
this.bufferSize = buffer.capacity();
}
+ this.startIndex = 0;
}
boolean release = true;
try {
int readBytes = tailBytes;
for (; ; ) {
- int localReadBytes = buffer.writeBytes(channel, bufferSize - readBytes);
+ int localReadBytes = buffer.writeBytes(channel, this.bufferSize - readBytes);
if (localReadBytes < 0) {
- if (tailBytes == readBytes) {
- // no more bytes are in the channel
+ if (buffer.isWritable()) {
+ //if read bytes is less than the buffer capacity, there is no more bytes in the channel
eof = true;
}
break;
@@ -106,9 +105,8 @@ public class ByteBufLineReader implements Closeable {
}
this.readBytes += (readBytes - tailBytes);
release = false;
- if (!eof) {
- this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
- }
+
+ this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
} finally {
if (release) {
buffer.release();
@@ -120,24 +118,36 @@ public class ByteBufLineReader implements Closeable {
* Read a line terminated by one of CR, LF, or CRLF.
*/
public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
- if(eof) return null;
-
- int startIndex = buffer.readerIndex();
- int readBytes;
+ int readBytes = 0; // newline + text line bytes
+ int newlineLength = 0; //length of terminating newline
int readable;
- int newlineLength; //length of terminating newline
+
+ this.startIndex = buffer.readerIndex();
loop:
while (true) {
readable = buffer.readableBytes();
if (readable <= 0) {
- buffer.readerIndex(startIndex);
+ buffer.readerIndex(this.startIndex);
fillBuffer(); //compact and fill buffer
- if (!buffer.isReadable()) {
+
+ //if buffer.writerIndex() is zero, there is no bytes in buffer
+ if (!buffer.isReadable() && buffer.writerIndex() == 0) {
+ reads.set(0);
return null;
} else {
- if (!eof) startIndex = 0; // reset the line start position
- else startIndex = buffer.readerIndex();
+ //skip first newLine
+ if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
+ buffer.skipBytes(1);
+ if(eof && !buffer.isReadable()) {
+ reads.set(1);
+ return null;
+ }
+
+ newlineLength++;
+ readBytes++;
+ startIndex = buffer.readerIndex();
+ }
}
readable = buffer.readableBytes();
}
@@ -147,19 +157,21 @@ public class ByteBufLineReader implements Closeable {
//does not appeared terminating newline
buffer.readerIndex(buffer.writerIndex()); // set to end buffer
if(eof){
- readBytes = buffer.readerIndex() - startIndex;
- newlineLength = 0;
+ readBytes += (buffer.readerIndex() - startIndex);
break loop;
}
} else {
buffer.readerIndex(endIndex + 1);
- readBytes = buffer.readerIndex() - startIndex;
+ readBytes += (buffer.readerIndex() - startIndex); //past newline + text line
+
+ //appeared terminating CRLF
if (processor.isPrevCharCR() && buffer.isReadable()
&& buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
buffer.skipBytes(1);
- newlineLength = 2;
+ readBytes++;
+ newlineLength += 2;
} else {
- newlineLength = 1;
+ newlineLength += 1;
}
break loop;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index eb1929e..0efe030 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -57,7 +57,7 @@ public class DelimitedLineReader implements Closeable {
private long startOffset, end, pos;
private boolean eof = true;
private ByteBufLineReader lineReader;
- private AtomicInteger tempReadBytes = new AtomicInteger();
+ private AtomicInteger lineReadBytes = new AtomicInteger();
private FileFragment fragment;
private Configuration conf;
@@ -122,11 +122,10 @@ public class DelimitedLineReader implements Closeable {
return null;
}
- ByteBuf buf = lineReader.readLineBuf(tempReadBytes);
+ ByteBuf buf = lineReader.readLineBuf(lineReadBytes);
+ pos += lineReadBytes.get();
if (buf == null) {
eof = true;
- } else {
- pos += tempReadBytes.get();
}
if (!isCompressed() && getCompressedPosition() > end) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 4512d00..bfaba04 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -19,6 +19,7 @@
package org.apache.tajo.storage;
import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
@@ -84,18 +86,15 @@ public class TestLineReader {
FileStatus status = fs.getFileStatus(tablePath);
ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
- assertEquals(status.getLen(), channel.available());
ByteBufLineReader reader = new ByteBufLineReader(channel);
- assertEquals(status.getLen(), reader.available());
long totalRead = 0;
int i = 0;
AtomicInteger bytes = new AtomicInteger();
for(;;){
ByteBuf buf = reader.readLineBuf(bytes);
- if(buf == null) break;
-
totalRead += bytes.get();
+ if(buf == null) break;
i++;
}
IOUtils.cleanup(null, reader, channel, fs);
@@ -171,18 +170,15 @@ public class TestLineReader {
String data = FileUtil.readTextFile(file);
ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
-
- assertEquals(file.length(), channel.available());
ByteBufLineReader reader = new ByteBufLineReader(channel);
- assertEquals(file.length(), reader.available());
long totalRead = 0;
int i = 0;
AtomicInteger bytes = new AtomicInteger();
for(;;){
ByteBuf buf = reader.readLineBuf(bytes);
- if(buf == null) break;
totalRead += bytes.get();
+ if(buf == null) break;
i++;
}
IOUtils.cleanup(null, reader);
@@ -190,4 +186,35 @@ public class TestLineReader {
assertEquals(file.length(), reader.readBytes());
assertEquals(data.split("\n").length, i);
}
+
+ @Test
+ public void testCRLFLine() throws IOException {
+ TajoConf conf = new TajoConf();
+ Path testFile = new Path(CommonTestingUtil.getTestDir(TEST_PATH), "testCRLFLineText.txt");
+
+ FileSystem fs = testFile.getFileSystem(conf);
+ FSDataOutputStream outputStream = fs.create(testFile, true);
+ outputStream.write("0\r\n1\r\n".getBytes());
+ outputStream.flush();
+ IOUtils.closeStream(outputStream);
+
+ ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(testFile));
+ ByteBufLineReader reader = new ByteBufLineReader(channel, BufferPool.directBuffer(2));
+ FileStatus status = fs.getFileStatus(testFile);
+
+ long totalRead = 0;
+ int i = 0;
+ AtomicInteger bytes = new AtomicInteger();
+ for(;;){
+ ByteBuf buf = reader.readLineBuf(bytes);
+ totalRead += bytes.get();
+ if(buf == null) break;
+ String row = buf.toString(Charset.defaultCharset());
+ assertEquals(i, Integer.parseInt(row));
+ i++;
+ }
+ IOUtils.cleanup(null, reader);
+ assertEquals(status.getLen(), totalRead);
+ assertEquals(status.getLen(), reader.readBytes());
+ }
}
[5/5] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/28151a96
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/28151a96
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/28151a96
Branch: refs/heads/index_support
Commit: 28151a965aacd3c064642047d22df2216c74bfeb
Parents: 761e372 bebc780
Author: Jihoon Son <ji...@apache.org>
Authored: Wed Dec 10 23:08:17 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed Dec 10 23:08:17 2014 +0900
----------------------------------------------------------------------
BUILDING | 2 +-
CHANGES | 8 +++
tajo-core/pom.xml | 2 +-
tajo-dist/pom.xml | 2 +-
tajo-project/pom.xml | 12 ++--
tajo-pullserver/pom.xml | 4 +-
.../tajo/storage/ByteBufInputChannel.java | 4 --
.../storage/parquet/TajoRecordConverter.java | 34 ++++------
.../tajo/storage/parquet/TajoWriteSupport.java | 23 ++++---
.../tajo/storage/text/ByteBufLineReader.java | 68 ++++++++++++--------
.../tajo/storage/text/DelimitedLineReader.java | 7 +-
.../org/apache/tajo/storage/TestLineReader.java | 43 ++++++++++---
12 files changed, 122 insertions(+), 87 deletions(-)
----------------------------------------------------------------------