You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/05 09:21:14 UTC
[10/29] tajo git commit: TAJO-1210: ByteBufLineReader does not handle
the end of file, if newline is not appeared. (jinho)
TAJO-1210: ByteBufLineReader does not handle the end of file, if newline is not appeared. (jinho)
Closes #272
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bf68b770
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bf68b770
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bf68b770
Branch: refs/heads/hbase_storage
Commit: bf68b770e6abbb4c63d696e264a348bb1ddb5982
Parents: cd38dff
Author: jhkim <jh...@apache.org>
Authored: Mon Dec 1 11:31:23 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Mon Dec 1 11:31:23 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 ++
tajo-storage/pom.xml | 3 +-
.../tajo/storage/text/ByteBufLineReader.java | 22 ++++++++++++--
.../org/apache/tajo/storage/TestLineReader.java | 32 +++++++++++++++++++-
.../org/apache/tajo/storage/TestStorages.java | 2 +-
.../apache/tajo/storage/avro/TestAvroUtil.java | 2 +-
.../src/test/resources/dataset/testLineText.txt | 2 ++
.../resources/dataset/testVariousTypes.avsc | 20 ++++++++++++
.../src/test/resources/testVariousTypes.avsc | 20 ------------
9 files changed, 78 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 025ae88..33983bc 100644
--- a/CHANGES
+++ b/CHANGES
@@ -79,6 +79,9 @@ Release 0.9.1 - unreleased
BUG FIXES
+ TAJO-1210: ByteBufLineReader does not handle the end of file,
+ if newline is not appeared. (jinho)
+
TAJO-1119: JDBC driver should support TIMESTAMP type. (jaehwa)
TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index ef26a32..7ede2e1 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -71,8 +71,7 @@
</executions>
<configuration>
<excludes>
- <exclude>src/test/resources/testVariousTypes.avsc</exclude>
- <exclude>src/test/resources/dataset/TestJsonSerDe/*.json</exclude>
+ <exclude>src/test/resources/dataset/**</exclude>
</excludes>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
index 1448885..86319e1 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@ -32,6 +32,7 @@ public class ByteBufLineReader implements Closeable {
private int bufferSize;
private long readBytes;
+ private boolean eof = false;
private ByteBuf buffer;
private final ByteBufInputChannel channel;
private final AtomicInteger tempReadBytes = new AtomicInteger();
@@ -92,6 +93,10 @@ public class ByteBufLineReader implements Closeable {
for (; ; ) {
int localReadBytes = buffer.writeBytes(channel, bufferSize - readBytes);
if (localReadBytes < 0) {
+ if (tailBytes == readBytes) {
+ // no more bytes are in the channel
+ eof = true;
+ }
break;
}
readBytes += localReadBytes;
@@ -101,7 +106,9 @@ public class ByteBufLineReader implements Closeable {
}
this.readBytes += (readBytes - tailBytes);
release = false;
- this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
+ if (!eof) {
+ this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
+ }
} finally {
if (release) {
buffer.release();
@@ -113,6 +120,8 @@ public class ByteBufLineReader implements Closeable {
* Read a line terminated by one of CR, LF, or CRLF.
*/
public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
+ if(eof) return null;
+
int startIndex = buffer.readerIndex();
int readBytes;
int readable;
@@ -127,14 +136,21 @@ public class ByteBufLineReader implements Closeable {
if (!buffer.isReadable()) {
return null;
} else {
- startIndex = 0; // reset the line start position
+ if (!eof) startIndex = 0; // reset the line start position
+ else startIndex = buffer.readerIndex();
}
readable = buffer.readableBytes();
}
int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor);
if (endIndex < 0) {
- buffer.readerIndex(buffer.writerIndex());
+ //does not appeared terminating newline
+ buffer.readerIndex(buffer.writerIndex()); // set to end buffer
+ if(eof){
+ readBytes = buffer.readerIndex() - startIndex;
+ newlineLength = 0;
+ break loop;
+ }
} else {
buffer.readerIndex(endIndex + 1);
readBytes = buffer.readerIndex() - startIndex;
http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
index ef6efdf..4512d00 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -34,11 +34,14 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.text.ByteBufLineReader;
-import org.apache.tajo.storage.text.DelimitedTextFile;
import org.apache.tajo.storage.text.DelimitedLineReader;
+import org.apache.tajo.storage.text.DelimitedTextFile;
import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
import org.junit.Test;
+import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -160,4 +163,31 @@ public class TestLineReader {
assertEquals(tupleNum, i);
}
+
+ @Test
+ public void testByteBufLineReaderWithoutTerminating() throws IOException {
+ String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile();
+ File file = new File(path);
+ String data = FileUtil.readTextFile(file);
+
+ ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
+
+ assertEquals(file.length(), channel.available());
+ ByteBufLineReader reader = new ByteBufLineReader(channel);
+ assertEquals(file.length(), reader.available());
+
+ long totalRead = 0;
+ int i = 0;
+ AtomicInteger bytes = new AtomicInteger();
+ for(;;){
+ ByteBuf buf = reader.readLineBuf(bytes);
+ if(buf == null) break;
+ totalRead += bytes.get();
+ i++;
+ }
+ IOUtils.cleanup(null, reader);
+ assertEquals(file.length(), totalRead);
+ assertEquals(file.length(), reader.readBytes());
+ assertEquals(data.split("\n").length, i);
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index c581926..bd1a1f9 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -320,7 +320,7 @@ public class TestStorages {
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
if (storeType == StoreType.AVRO) {
- String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString();
+ String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString();
meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
index 6186e9e..a79e8ab 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
@@ -49,7 +49,7 @@ public class TestAvroUtil {
@Before
public void setUp() throws Exception {
- schemaUrl = FileUtil.getResourcePath("testVariousTypes.avsc");
+ schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc");
assertNotNull(schemaUrl);
File file = new File(schemaUrl.getPath());
http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/resources/dataset/testLineText.txt
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/dataset/testLineText.txt b/tajo-storage/src/test/resources/dataset/testLineText.txt
new file mode 100644
index 0000000..7403c26
--- /dev/null
+++ b/tajo-storage/src/test/resources/dataset/testLineText.txt
@@ -0,0 +1,2 @@
+1|25|emiya muljomdao
+2|25|emiya muljomdao
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc b/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc
new file mode 100644
index 0000000..d4250a9
--- /dev/null
+++ b/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc
@@ -0,0 +1,20 @@
+{
+ "type": "record",
+ "namespace": "org.apache.tajo",
+ "name": "testVariousTypes",
+ "fields": [
+ { "name": "col1", "type": "boolean" },
+ { "name": "col2", "type": "string" },
+ { "name": "col3", "type": "int" },
+ { "name": "col4", "type": "int" },
+ { "name": "col5", "type": "long" },
+ { "name": "col6", "type": "float" },
+ { "name": "col7", "type": "double" },
+ { "name": "col8", "type": "string" },
+ { "name": "col9", "type": "bytes" },
+ { "name": "col10", "type": "bytes" },
+ { "name": "col11", "type": "null" },
+ { "name": "col12", "type": "bytes" }
+ ]
+}
+
http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/resources/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/testVariousTypes.avsc b/tajo-storage/src/test/resources/testVariousTypes.avsc
deleted file mode 100644
index d4250a9..0000000
--- a/tajo-storage/src/test/resources/testVariousTypes.avsc
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "type": "record",
- "namespace": "org.apache.tajo",
- "name": "testVariousTypes",
- "fields": [
- { "name": "col1", "type": "boolean" },
- { "name": "col2", "type": "string" },
- { "name": "col3", "type": "int" },
- { "name": "col4", "type": "int" },
- { "name": "col5", "type": "long" },
- { "name": "col6", "type": "float" },
- { "name": "col7", "type": "double" },
- { "name": "col8", "type": "string" },
- { "name": "col9", "type": "bytes" },
- { "name": "col10", "type": "bytes" },
- { "name": "col11", "type": "null" },
- { "name": "col12", "type": "bytes" }
- ]
-}
-