You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/10 15:08:31 UTC

[1/5] tajo git commit: TAJO-1236: Remove slow 'new String' operation in parquet format. (jinho)

Repository: tajo
Updated Branches:
  refs/heads/index_support 761e372ba -> 28151a965


TAJO-1236: Remove slow 'new String' operation in parquet format. (jinho)

Closes #292


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/661c7e21
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/661c7e21
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/661c7e21

Branch: refs/heads/index_support
Commit: 661c7e216d664c71d2c889082703ecc6ae028dc3
Parents: 8f68b4b
Author: jhkim <jh...@apache.org>
Authored: Tue Dec 9 14:42:02 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Tue Dec 9 14:42:02 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../storage/parquet/TajoRecordConverter.java    | 34 ++++++++------------
 .../tajo/storage/parquet/TajoWriteSupport.java  | 23 +++++++------
 3 files changed, 28 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/661c7e21/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 59a8f68..89d39bc 100644
--- a/CHANGES
+++ b/CHANGES
@@ -18,6 +18,9 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1236: Remove slow 'new String' operation in parquet format. 
+    (jinho)
+
     TAJO-1230: Disable ipv6 support on JVM. (Jihun Kang via hyunsik)
 
     TAJO-1213: Implement CatalogStore::updateTableStats. (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/661c7e21/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 7c3d79d..a091eac 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -18,29 +18,23 @@
 
 package org.apache.tajo.storage.parquet;
 
-import com.google.protobuf.Message;
 import com.google.protobuf.InvalidProtocolBufferException;
-
-import java.nio.ByteBuffer;
-
-import parquet.io.api.GroupConverter;
-import parquet.io.api.Converter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.io.api.Binary;
-import parquet.schema.Type;
-import parquet.schema.GroupType;
-
+import com.google.protobuf.Message;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.*;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.BlobDatum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatumFactory;
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.GroupType;
+import parquet.schema.Type;
+
+import java.nio.ByteBuffer;
 
 /**
  * Converter to convert a Parquet record into a Tajo Tuple.
@@ -222,7 +216,7 @@ public class TajoRecordConverter extends GroupConverter {
 
     @Override
     final public void addBinary(Binary value) {
-      parent.add(DatumFactory.createChar(value.toStringUsingUTF8()));
+      parent.add(DatumFactory.createChar(value.getBytes()));
     }
   }
 
@@ -343,7 +337,7 @@ public class TajoRecordConverter extends GroupConverter {
 
     @Override
     final public void addBinary(Binary value) {
-      parent.add(DatumFactory.createText(value.toStringUsingUTF8()));
+      parent.add(DatumFactory.createText(value.getBytes()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/661c7e21/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
index 35165de..8651131 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
@@ -18,10 +18,12 @@
 
 package org.apache.tajo.storage.parquet;
 
-import java.util.Map;
-import java.util.HashMap;
-import java.util.List;
-
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
 import parquet.hadoop.api.WriteSupport;
 import parquet.io.api.Binary;
 import parquet.io.api.RecordConsumer;
@@ -29,12 +31,9 @@ import parquet.schema.GroupType;
 import parquet.schema.MessageType;
 import parquet.schema.Type;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.datum.Datum;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Tajo implementation of {@link WriteSupport} for {@link Tuple}s.
@@ -116,7 +115,7 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
   private void writeValue(Type fieldType, Column column, Datum datum) {
     switch (column.getDataType().getType()) {
       case BOOLEAN:
-        recordConsumer.addBoolean((Boolean) datum.asBool());
+        recordConsumer.addBoolean(datum.asBool());
         break;
       case BIT:
       case INT2:
@@ -134,7 +133,7 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
         break;
       case CHAR:
       case TEXT:
-        recordConsumer.addBinary(Binary.fromString(datum.asChars()));
+        recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes()));
         break;
       case PROTOBUF:
       case BLOB:


[3/5] tajo git commit: TAJO-1237: Fix missing maven-module for pullserver. (jinho)

Posted by ji...@apache.org.
TAJO-1237: Fix missing maven-module for pullserver. (jinho)

Closes #293


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/72256fcf
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/72256fcf
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/72256fcf

Branch: refs/heads/index_support
Commit: 72256fcff2c77222e278cb690efb8f669e926a29
Parents: ac6799f
Author: jhkim <jh...@apache.org>
Authored: Wed Dec 10 11:20:47 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Wed Dec 10 11:20:47 2014 +0900

----------------------------------------------------------------------
 BUILDING                | 2 +-
 CHANGES                 | 2 ++
 tajo-core/pom.xml       | 2 +-
 tajo-dist/pom.xml       | 2 +-
 tajo-project/pom.xml    | 2 +-
 tajo-pullserver/pom.xml | 4 ++--
 6 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/BUILDING
----------------------------------------------------------------------
diff --git a/BUILDING b/BUILDING
index 974fac8..8606d80 100644
--- a/BUILDING
+++ b/BUILDING
@@ -20,7 +20,7 @@ Maven main modules:
          - tajo-rpc
          - tajo-catalog
          - tajo-storage
-         - tajo-yarn-pullserver
+         - tajo-pullserver
          - tajo-core
          - tajo-client
          - tajo-jdbc

http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 89d39bc..b2985ba 100644
--- a/CHANGES
+++ b/CHANGES
@@ -90,6 +90,8 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1237: Fix missing maven-module for pullserver. (jinho)
+
     TAJO-1196: Unit test hangs occasionally and randomly. (jihoon)
 
     TAJO-1234: Rearrange timezone in date/time types. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 060ac1b..04706b6 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -250,7 +250,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>
-      <artifactId>tajo-yarn-pullserver</artifactId>
+      <artifactId>tajo-pullserver</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.tajo</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/tajo-dist/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml
index d350889..40b6c56 100644
--- a/tajo-dist/pom.xml
+++ b/tajo-dist/pom.xml
@@ -130,7 +130,7 @@
                       run cp -r $ROOT/tajo-client/target/tajo-client-${project.version}/* .
                       run cp -r $ROOT/tajo-catalog/target/tajo-catalog-${project.version}/* .
                       run cp -r $ROOT/tajo-storage/target/tajo-storage-${project.version}/* .
-                      run cp -r $ROOT/tajo-yarn-pullserver/target/tajo-yarn-pullserver-${project.version}.jar .
+                      run cp -r $ROOT/tajo-pullserver/target/tajo-pullserver-${project.version}.jar .
                       run cp -r $ROOT/tajo-core/target/tajo-core-${project.version}.jar .
                       run cp -r $ROOT/tajo-core/target/lib .
                       run cp -r ${project.basedir}/src/main/bin .

http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index eaa54b4..63b0aec 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -746,7 +746,7 @@
       </dependency>
       <dependency>
         <groupId>org.apache.tajo</groupId>
-        <artifactId>tajo-yarn-pullserver</artifactId>
+        <artifactId>tajo-pullserver</artifactId>
         <version>${tajo.version}</version>
       </dependency>
       <dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/72256fcf/tajo-pullserver/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml
index a7644a1..b9c6624 100644
--- a/tajo-pullserver/pom.xml
+++ b/tajo-pullserver/pom.xml
@@ -25,8 +25,8 @@
     <relativePath>../tajo-project</relativePath>
   </parent>
   <modelVersion>4.0.0</modelVersion>
-  <name>Tajo Core PullServer</name>
-  <artifactId>tajo-yarn-pullserver</artifactId>
+  <name>Tajo PullServer</name>
+  <artifactId>tajo-pullserver</artifactId>
 
   <build>
     <plugins>


[2/5] tajo git commit: Updating outdated affiliations.

Posted by ji...@apache.org.
Updating outdated affiliations.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ac6799f0
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ac6799f0
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ac6799f0

Branch: refs/heads/index_support
Commit: ac6799f0ebfdd56b484611ea28dada8880406d19
Parents: 661c7e2
Author: Jakob Homan <jg...@gmail.com>
Authored: Tue Dec 9 16:32:03 2014 -0800
Committer: Jakob Homan <jg...@gmail.com>
Committed: Tue Dec 9 16:32:03 2014 -0800

----------------------------------------------------------------------
 tajo-project/pom.xml | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/ac6799f0/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 14a81c3..eaa54b4 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -87,7 +87,7 @@
       <id>ereisman</id>
       <name>Eli Reisman</name>
       <email>ereisman@apache.org</email>
-      <organization>Hortonworks</organization>
+      <organization>Etsy</organization>
       <roles>
         <role>PMC</role>
       </roles>
@@ -136,7 +136,7 @@
       <id>jghoman</id>
       <name>Jakob Homan</name>
       <email>jghoman@apache.org</email>
-      <organization>LinkedIn</organization>
+      <organization>Microsoft</organization>
       <roles>
         <role>PMC</role>
       </roles>
@@ -166,7 +166,7 @@
       <id>mzhou</id>
       <name>Min Zhou</name>
       <email>mzhou@apache.org</email>
-      <organization>LinkedIn</organization>
+      <organization>Tango Me</organization>
       <roles>
         <role>Committer</role>
       </roles>
@@ -196,7 +196,7 @@
       <id>rsumbaly</id>
       <name>Roshan Sumbaly</name>
       <email>rsumbaly@apache.org</email>
-      <organization>LinkedIn</organization>
+      <organization>Coursera</organization>
       <roles>
         <role>PMC</role>
       </roles>
@@ -236,7 +236,7 @@
     </contributor>
     <contributor>
       <name>David Chen</name>
-      <organization>LinkedIn</organization>
+      <organization>Google</organization>
       <roles><role>Contributor</role></roles>
     </contributor>
     <contributor>


[4/5] tajo git commit: TAJO-1235: ByteBufLineReader can not read text line with CRLF. (jinho)

Posted by ji...@apache.org.
TAJO-1235: ByteBufLineReader can not read text line with CRLF. (jinho)

Closes #289


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/bebc7801
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/bebc7801
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/bebc7801

Branch: refs/heads/index_support
Commit: bebc78011798cd2fd691e8901141e94f1d445d6e
Parents: 72256fc
Author: jhkim <jh...@apache.org>
Authored: Wed Dec 10 11:25:03 2014 +0900
Committer: jhkim <jh...@apache.org>
Committed: Wed Dec 10 11:25:03 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../tajo/storage/ByteBufInputChannel.java       |  4 --
 .../tajo/storage/text/ByteBufLineReader.java    | 68 ++++++++++++--------
 .../tajo/storage/text/DelimitedLineReader.java  |  7 +-
 .../org/apache/tajo/storage/TestLineReader.java | 43 ++++++++++---
 5 files changed, 81 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b2985ba..710f463 100644
--- a/CHANGES
+++ b/CHANGES
@@ -90,6 +90,9 @@ Release 0.9.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1235: ByteBufLineReader can not read text line with CRLF.
+    (jinho)
+
     TAJO-1237: Fix missing maven-module for pullserver. (jinho)
 
     TAJO-1196: Unit test hangs occasionally and randomly. (jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
index b1b6d65..45fb1d8 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java
@@ -69,8 +69,4 @@ public class ByteBufInputChannel extends AbstractInterruptibleChannel implements
   protected void implCloseChannel() throws IOException {
     IOUtils.cleanup(null, channel, inputStream);
   }
-
-  public int available() throws IOException {
-    return inputStream.available();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
index 86319e1..2f742c6 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java
@@ -32,10 +32,11 @@ public class ByteBufLineReader implements Closeable {
 
   private int bufferSize;
   private long readBytes;
+  private int startIndex;
   private boolean eof = false;
   private ByteBuf buffer;
   private final ByteBufInputChannel channel;
-  private final AtomicInteger tempReadBytes = new AtomicInteger();
+  private final AtomicInteger lineReadBytes = new AtomicInteger();
   private final LineSplitProcessor processor = new LineSplitProcessor();
 
   public ByteBufLineReader(ByteBufInputChannel channel) {
@@ -53,10 +54,6 @@ public class ByteBufLineReader implements Closeable {
     return readBytes - buffer.readableBytes();
   }
 
-  public long available() throws IOException {
-    return channel.available() + buffer.readableBytes();
-  }
-
   @Override
   public void close() throws IOException {
     if (this.buffer.refCnt() > 0) {
@@ -66,7 +63,7 @@ public class ByteBufLineReader implements Closeable {
   }
 
   public String readLine() throws IOException {
-    ByteBuf buf = readLineBuf(tempReadBytes);
+    ByteBuf buf = readLineBuf(lineReadBytes);
     if (buf != null) {
       return buf.toString(CharsetUtil.UTF_8);
     }
@@ -77,24 +74,26 @@ public class ByteBufLineReader implements Closeable {
 
     int tailBytes = 0;
     if (this.readBytes > 0) {
+      //startIndex = 0, readIndex = tailBytes length, writable = (buffer capacity - tailBytes)
       this.buffer.markReaderIndex();
-      this.buffer.discardSomeReadBytes();  // compact the buffer
+      this.buffer.discardReadBytes();  // compact the buffer
       tailBytes = this.buffer.writerIndex();
       if (!this.buffer.isWritable()) {
         // a line bytes is large than the buffer
-        BufferPool.ensureWritable(buffer, bufferSize);
+        BufferPool.ensureWritable(buffer, bufferSize * 2);
         this.bufferSize = buffer.capacity();
       }
+      this.startIndex = 0;
     }
 
     boolean release = true;
     try {
       int readBytes = tailBytes;
       for (; ; ) {
-        int localReadBytes = buffer.writeBytes(channel, bufferSize - readBytes);
+        int localReadBytes = buffer.writeBytes(channel, this.bufferSize - readBytes);
         if (localReadBytes < 0) {
-          if (tailBytes == readBytes) {
-            // no more bytes are in the channel
+          if (buffer.isWritable()) {
+            //if read bytes is less than the buffer capacity,  there is no more bytes in the channel
             eof = true;
           }
           break;
@@ -106,9 +105,8 @@ public class ByteBufLineReader implements Closeable {
       }
       this.readBytes += (readBytes - tailBytes);
       release = false;
-      if (!eof) {
-        this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
-      }
+
+      this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail)
     } finally {
       if (release) {
         buffer.release();
@@ -120,24 +118,36 @@ public class ByteBufLineReader implements Closeable {
    * Read a line terminated by one of CR, LF, or CRLF.
    */
   public ByteBuf readLineBuf(AtomicInteger reads) throws IOException {
-    if(eof) return null;
-
-    int startIndex = buffer.readerIndex();
-    int readBytes;
+    int readBytes = 0; // newline + text line bytes
+    int newlineLength = 0; //length of terminating newline
     int readable;
-    int newlineLength; //length of terminating newline
+
+    this.startIndex = buffer.readerIndex();
 
     loop:
     while (true) {
       readable = buffer.readableBytes();
       if (readable <= 0) {
-        buffer.readerIndex(startIndex);
+        buffer.readerIndex(this.startIndex);
         fillBuffer(); //compact and fill buffer
-        if (!buffer.isReadable()) {
+
+        //if buffer.writerIndex() is zero, there is no bytes in buffer
+        if (!buffer.isReadable() && buffer.writerIndex() == 0) {
+          reads.set(0);
           return null;
         } else {
-          if (!eof) startIndex = 0; // reset the line start position
-          else startIndex = buffer.readerIndex();
+          //skip first newLine
+          if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
+            buffer.skipBytes(1);
+            if(eof && !buffer.isReadable()) {
+              reads.set(1);
+              return null;
+            }
+
+            newlineLength++;
+            readBytes++;
+            startIndex = buffer.readerIndex();
+          }
         }
         readable = buffer.readableBytes();
       }
@@ -147,19 +157,21 @@ public class ByteBufLineReader implements Closeable {
         //does not appeared terminating newline
         buffer.readerIndex(buffer.writerIndex()); // set to end buffer
         if(eof){
-          readBytes = buffer.readerIndex() - startIndex;
-          newlineLength = 0;
+          readBytes += (buffer.readerIndex() - startIndex);
           break loop;
         }
       } else {
         buffer.readerIndex(endIndex + 1);
-        readBytes = buffer.readerIndex() - startIndex;
+        readBytes += (buffer.readerIndex() - startIndex); //past newline + text line
+
+        //appeared terminating CRLF
         if (processor.isPrevCharCR() && buffer.isReadable()
             && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) {
           buffer.skipBytes(1);
-          newlineLength = 2;
+          readBytes++;
+          newlineLength += 2;
         } else {
-          newlineLength = 1;
+          newlineLength += 1;
         }
         break loop;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
index eb1929e..0efe030 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java
@@ -57,7 +57,7 @@ public class DelimitedLineReader implements Closeable {
   private long startOffset, end, pos;
   private boolean eof = true;
   private ByteBufLineReader lineReader;
-  private AtomicInteger tempReadBytes = new AtomicInteger();
+  private AtomicInteger lineReadBytes = new AtomicInteger();
   private FileFragment fragment;
   private Configuration conf;
 
@@ -122,11 +122,10 @@ public class DelimitedLineReader implements Closeable {
       return null;
     }
 
-    ByteBuf buf = lineReader.readLineBuf(tempReadBytes);
+    ByteBuf buf = lineReader.readLineBuf(lineReadBytes);
+    pos += lineReadBytes.get();
     if (buf == null) {
       eof = true;
-    } else {
-      pos += tempReadBytes.get();
     }
 
     if (!isCompressed() && getCompressedPosition() > end) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/bebc7801/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 4512d00..bfaba04 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.storage;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -43,6 +44,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.*;
@@ -84,18 +86,15 @@ public class TestLineReader {
     FileStatus status = fs.getFileStatus(tablePath);
 
     ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath));
-    assertEquals(status.getLen(), channel.available());
     ByteBufLineReader reader = new ByteBufLineReader(channel);
-    assertEquals(status.getLen(), reader.available());
 
     long totalRead = 0;
     int i = 0;
     AtomicInteger bytes = new AtomicInteger();
     for(;;){
       ByteBuf buf = reader.readLineBuf(bytes);
-      if(buf == null) break;
-
       totalRead += bytes.get();
+      if(buf == null) break;
       i++;
     }
     IOUtils.cleanup(null, reader, channel, fs);
@@ -171,18 +170,15 @@ public class TestLineReader {
     String data = FileUtil.readTextFile(file);
 
     ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file));
-
-    assertEquals(file.length(), channel.available());
     ByteBufLineReader reader = new ByteBufLineReader(channel);
-    assertEquals(file.length(), reader.available());
 
     long totalRead = 0;
     int i = 0;
     AtomicInteger bytes = new AtomicInteger();
     for(;;){
       ByteBuf buf = reader.readLineBuf(bytes);
-      if(buf == null) break;
       totalRead += bytes.get();
+      if(buf == null) break;
       i++;
     }
     IOUtils.cleanup(null, reader);
@@ -190,4 +186,35 @@ public class TestLineReader {
     assertEquals(file.length(), reader.readBytes());
     assertEquals(data.split("\n").length, i);
   }
+
+  @Test
+  public void testCRLFLine() throws IOException {
+    TajoConf conf = new TajoConf();
+    Path testFile = new Path(CommonTestingUtil.getTestDir(TEST_PATH), "testCRLFLineText.txt");
+
+    FileSystem fs = testFile.getFileSystem(conf);
+    FSDataOutputStream outputStream = fs.create(testFile, true);
+    outputStream.write("0\r\n1\r\n".getBytes());
+    outputStream.flush();
+    IOUtils.closeStream(outputStream);
+
+    ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(testFile));
+    ByteBufLineReader reader = new ByteBufLineReader(channel, BufferPool.directBuffer(2));
+    FileStatus status = fs.getFileStatus(testFile);
+
+    long totalRead = 0;
+    int i = 0;
+    AtomicInteger bytes = new AtomicInteger();
+    for(;;){
+      ByteBuf buf = reader.readLineBuf(bytes);
+      totalRead += bytes.get();
+      if(buf == null) break;
+      String row  = buf.toString(Charset.defaultCharset());
+      assertEquals(i, Integer.parseInt(row));
+      i++;
+    }
+    IOUtils.cleanup(null, reader);
+    assertEquals(status.getLen(), totalRead);
+    assertEquals(status.getLen(), reader.readBytes());
+  }
 }


[5/5] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support

Posted by ji...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/28151a96
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/28151a96
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/28151a96

Branch: refs/heads/index_support
Commit: 28151a965aacd3c064642047d22df2216c74bfeb
Parents: 761e372 bebc780
Author: Jihoon Son <ji...@apache.org>
Authored: Wed Dec 10 23:08:17 2014 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Wed Dec 10 23:08:17 2014 +0900

----------------------------------------------------------------------
 BUILDING                                        |  2 +-
 CHANGES                                         |  8 +++
 tajo-core/pom.xml                               |  2 +-
 tajo-dist/pom.xml                               |  2 +-
 tajo-project/pom.xml                            | 12 ++--
 tajo-pullserver/pom.xml                         |  4 +-
 .../tajo/storage/ByteBufInputChannel.java       |  4 --
 .../storage/parquet/TajoRecordConverter.java    | 34 ++++------
 .../tajo/storage/parquet/TajoWriteSupport.java  | 23 ++++---
 .../tajo/storage/text/ByteBufLineReader.java    | 68 ++++++++++++--------
 .../tajo/storage/text/DelimitedLineReader.java  |  7 +-
 .../org/apache/tajo/storage/TestLineReader.java | 43 ++++++++++---
 12 files changed, 122 insertions(+), 87 deletions(-)
----------------------------------------------------------------------