You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/02 04:56:45 UTC

[12/14] tajo git commit: TAJO-1210: ByteBufLineReader does not handle the end of file, if newline is not appeared. (jinho)

TAJO-1210: ByteBufLineReader does not handle the end of file, if newline is not appeared. (jinho)

Closes #272


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bf68b770
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bf68b770
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bf68b770

Branch: refs/heads/index_support
Commit: bf68b770e6abbb4c63d696e264a348bb1ddb5982
Parents: cd38dff
Author: jhkim <jh...@apache.org>
Authored: Mon Dec 1 11:31:23 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Mon Dec 1 11:31:23 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 tajo-storage/pom.xml                            |  3 +-
 .../tajo/storage/text/ByteBufLineReader.java    | 22 ++++++++++++--
 .../org/apache/tajo/storage/TestLineReader.java | 32 +++++++++++++++++++-
 .../org/apache/tajo/storage/TestStorages.java   |  2 +-
 .../apache/tajo/storage/avro/TestAvroUtil.java  |  2 +-
 .../src/test/resources/dataset/testLineText.txt |  2 ++
 .../resources/dataset/testVariousTypes.avsc     | 20 ++++++++++++
 .../src/test/resources/testVariousTypes.avsc    | 20 ------------
 9 files changed, 78 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 025ae88..33983bc 100644
--- a/CHANGES
+++ b/CHANGES
@@ -79,6 +79,9 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1210: ByteBufLineReader does not handle the end of file, 
+    if newline is not appeared. (jinho)
+
     TAJO-1119: JDBC driver should support TIMESTAMP type. (jaehwa)
 
     TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index ef26a32..7ede2e1 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -71,8 +71,7 @@
         </executions>
         <configuration>
           <excludes>
-            <exclude>src/test/resources/testVariousTypes.avsc</exclude>
-            <exclude>src/test/resources/dataset/TestJsonSerDe/*.json</exclude>
+            <exclude>src/test/resources/dataset/**</exclude>
           </excludes>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
index 1448885..86319e1 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@ -32,6 +32,7 @@ public class ByteBufLineReader implements Closeable {
 
   private int bufferSize;
   private long readBytes;
+  private boolean eof = false;
   private ByteBuf buffer;
   private final ByteBufInputChannel channel;
   private final AtomicInteger tempReadBytes = new AtomicInteger();
@@ -92,6 +93,10 @@ public class ByteBufLineReader implements Closeable {
       for (; ; ) {
         int localReadBytes = buffer.writeBytes(channel, bufferSize - readBytes);
         if (localReadBytes < 0) {
+          if (tailBytes == readBytes) {
+            // no more bytes are in the channel
+            eof = true;
+          }
           break;
         }
         readBytes += localReadBytes;
@@ -101,7 +106,9 @@ public class ByteBufLineReader implements Closeable {
       }
       this.readBytes += (readBytes - tailBytes);
       release = false;
-      this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
+      if (!eof) {
+        this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
+      }
     } finally {
       if (release) {
         buffer.release();
@@ -113,6 +120,8 @@ public class ByteBufLineReader implements Closeable {
    * Read a line terminated by one of CR, LF, or CRLF.
    */
   public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
+    if(eof) return null;
+
     int startIndex = buffer.readerIndex();
     int readBytes;
     int readable;
@@ -127,14 +136,21 @@ public class ByteBufLineReader implements Closeable {
         if (!buffer.isReadable()) {
           return null;
         } else {
-          startIndex = 0; // reset the line start position
+          if (!eof) startIndex = 0; // reset the line start position
+          else startIndex = buffer.readerIndex();
         }
         readable = buffer.readableBytes();
       }
 
       int endIndex = buffer.forEachByte(buffer.readerIndex(), readable, processor);
       if (endIndex < 0) {
-        buffer.readerIndex(buffer.writerIndex());
+        //does not appeared terminating newline
+        buffer.readerIndex(buffer.writerIndex()); // set to end buffer
+        if(eof){
+          readBytes = buffer.readerIndex() - startIndex;
+          newlineLength = 0;
+          break loop;
+        }
       } else {
         buffer.readerIndex(endIndex + 1);
         readBytes = buffer.readerIndex() - startIndex;

http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
index ef6efdf..4512d00 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -34,11 +34,14 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.text.ByteBufLineReader;
-import org.apache.tajo.storage.text.DelimitedTextFile;
 import org.apache.tajo.storage.text.DelimitedLineReader;
+import org.apache.tajo.storage.text.DelimitedTextFile;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.FileUtil;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -160,4 +163,31 @@ public class TestLineReader {
     assertEquals(tupleNum, i);
 
   }
+
+  @Test
+  public void testByteBufLineReaderWithoutTerminating() throws IOException {
+    String path = FileUtil.getResourcePath("dataset/testLineText.txt").getFile();
+    File file = new File(path);
+    String data = FileUtil.readTextFile(file);
+
+    ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
+
+    assertEquals(file.length(), channel.available());
+    ByteBufLineReader reader = new ByteBufLineReader(channel);
+    assertEquals(file.length(), reader.available());
+
+    long totalRead = 0;
+    int i = 0;
+    AtomicInteger bytes = new AtomicInteger();
+    for(;;){
+      ByteBuf buf = reader.readLineBuf(bytes);
+      if(buf == null) break;
+      totalRead += bytes.get();
+      i++;
+    }
+    IOUtils.cleanup(null, reader);
+    assertEquals(file.length(), totalRead);
+    assertEquals(file.length(), reader.readBytes());
+    assertEquals(data.split("\n").length, i);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index c581926..bd1a1f9 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -320,7 +320,7 @@ public class TestStorages {
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
     meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
     if (storeType == StoreType.AVRO) {
-      String path = FileUtil.getResourcePath("testVariousTypes.avsc").toString();
+      String path = FileUtil.getResourcePath("dataset/testVariousTypes.avsc").toString();
       meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
index 6186e9e..a79e8ab 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/avro/TestAvroUtil.java
@@ -49,7 +49,7 @@ public class TestAvroUtil {
 
   @Before
   public void setUp() throws Exception {
-    schemaUrl = FileUtil.getResourcePath("testVariousTypes.avsc");
+    schemaUrl = FileUtil.getResourcePath("dataset/testVariousTypes.avsc");
     assertNotNull(schemaUrl);
 
     File file = new File(schemaUrl.getPath());

http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/resources/dataset/testLineText.txt
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/dataset/testLineText.txt b/tajo-storage/src/test/resources/dataset/testLineText.txt
new file mode 100644
index 0000000..7403c26
--- /dev/null
+++ b/tajo-storage/src/test/resources/dataset/testLineText.txt
@@ -0,0 +1,2 @@
+1|25|emiya muljomdao
+2|25|emiya muljomdao
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc b/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc
new file mode 100644
index 0000000..d4250a9
--- /dev/null
+++ b/tajo-storage/src/test/resources/dataset/testVariousTypes.avsc
@@ -0,0 +1,20 @@
+{
+  "type": "record",
+  "namespace": "org.apache.tajo",
+  "name": "testVariousTypes",
+  "fields": [
+    { "name": "col1", "type": "boolean" },
+    { "name": "col2", "type": "string" },
+    { "name": "col3", "type": "int" },
+    { "name": "col4", "type": "int" },
+    { "name": "col5", "type": "long" },
+    { "name": "col6", "type": "float" },
+    { "name": "col7", "type": "double" },
+    { "name": "col8", "type": "string" },
+    { "name": "col9", "type": "bytes" },
+    { "name": "col10", "type": "bytes" },
+    { "name": "col11", "type": "null" },
+    { "name": "col12", "type": "bytes" }
+  ]
+}
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/bf68b770/tajo-storage/src/test/resources/testVariousTypes.avsc
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/testVariousTypes.avsc b/tajo-storage/src/test/resources/testVariousTypes.avsc
deleted file mode 100644
index d4250a9..0000000
--- a/tajo-storage/src/test/resources/testVariousTypes.avsc
+++ /dev/null
@@ -1,20 +0,0 @@
-{
-  "type": "record",
-  "namespace": "org.apache.tajo",
-  "name": "testVariousTypes",
-  "fields": [
-    { "name": "col1", "type": "boolean" },
-    { "name": "col2", "type": "string" },
-    { "name": "col3", "type": "int" },
-    { "name": "col4", "type": "int" },
-    { "name": "col5", "type": "long" },
-    { "name": "col6", "type": "float" },
-    { "name": "col7", "type": "double" },
-    { "name": "col8", "type": "string" },
-    { "name": "col9", "type": "bytes" },
-    { "name": "col10", "type": "bytes" },
-    { "name": "col11", "type": "null" },
-    { "name": "col12", "type": "bytes" }
-  ]
-}
-