You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/11/28 10:07:51 UTC

[1/3] git commit: TAJO-332: Invalid row count of CSVScanner. (jinho)

Updated Branches:
  refs/heads/DAG-execplan 7c97735e1 -> 6532324b0


TAJO-332: Invalid row count of CSVScanner. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/b8435e7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/b8435e7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/b8435e7c

Branch: refs/heads/DAG-execplan
Commit: b8435e7c2dd9999023cca6040b33427dd886c2ae
Parents: 67e0d94
Author: jinossy <ji...@gmail.com>
Authored: Wed Nov 27 12:56:55 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Wed Nov 27 12:56:55 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../main/java/org/apache/tajo/util/Bytes.java   |  46 +-
 .../java/org/apache/tajo/storage/CSVFile.java   | 378 +++++--------
 .../java/org/apache/tajo/storage/LazyTuple.java |  76 +--
 .../org/apache/tajo/storage/LineReader.java     | 555 +++++++++++++++++++
 .../tajo/storage/TextSerializeDeserialize.java  |   4 +-
 .../tajo/storage/TestCompressionStorages.java   |   2 +-
 7 files changed, 721 insertions(+), 342 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e1a824d..f60ee90 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,8 @@ Release 0.8.0 - unreleased
 
   BUG FIXES
 
+    TAJO-332: Invalid row count of CSVScanner. (jinho)
+
     TAJO-326: In ExecutionBlock, isRoot() and isLeafBlock() return invalid values. (jihoon)
 
     TAJO-296: Late registration of Tajo workers. (hyoungjunkim via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
index 299ed4c..cfabf21 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/Bytes.java
@@ -26,10 +26,7 @@ import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.io.WritableUtils;
 import sun.misc.Unsafe;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
 import java.lang.reflect.Field;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -1413,19 +1410,19 @@ public class Bytes {
   }
 
   public static byte[][] splitPreserveAllTokens(byte[] str, char separatorChar, int[] target) {
-    return splitWorker(str, 0, separatorChar, true, target);
+    return splitWorker(str, 0, -1, separatorChar, true, target);
   }
 
-  public static byte[][] splitPreserveAllTokens(byte[] str, int length, char separatorChar, int[] target) {
-    return splitWorker(str, length, separatorChar, true, target);
+  public static byte[][] splitPreserveAllTokens(byte[] str, int offset, int length, char separatorChar, int[] target) {
+    return splitWorker(str, offset, length, separatorChar, true, target);
   }
 
   public static byte[][] splitPreserveAllTokens(byte[] str, char separatorChar) {
-    return splitWorker(str, 0, separatorChar, true, null);
+    return splitWorker(str, 0, -1, separatorChar, true, null);
   }
 
   public static byte[][] splitPreserveAllTokens(byte[] str, int length, char separatorChar) {
-    return splitWorker(str, length, separatorChar, true, null);
+    return splitWorker(str, 0, length, separatorChar, true, null);
   }
 
   /**
@@ -1442,20 +1439,19 @@ public class Bytes {
    * separators are treated as one separator.
    * @return an array of parsed Strings, <code>null</code> if null String input
    */
-  private static byte[][] splitWorker(byte[] str, int length, char separatorChar, boolean preserveAllTokens, int[] target) {
+  private static byte[][] splitWorker(byte[] str, int offset, int length, char separatorChar, boolean preserveAllTokens, int[] target) {
     // Performance tuned for 2.0 (JDK1.4)
 
     if (str == null) {
       return null;
     }
     int len = length;
-    if(len < 1){
-      len = str.length;
-    }
-
     if (len == 0) {
       return new byte[1][0];
+    }else if(len < 0){
+      len = str.length - offset;
     }
+
     List list = new ArrayList();
     int i = 0, start = 0;
     boolean match = false;
@@ -1463,15 +1459,15 @@ public class Bytes {
     int currentTarget = 0;
     int currentIndex = 0;
     while (i < len) {
-      if (str[i] == separatorChar) {
+      if (str[i + offset] == separatorChar) {
         if (match || preserveAllTokens) {
           if (target == null) {
             byte[] bytes = new byte[i - start];
-            System.arraycopy(str, start, bytes, 0, bytes.length);
+            System.arraycopy(str, start + offset, bytes, 0, bytes.length);
             list.add(bytes);
           } else if (target.length > currentTarget && currentIndex == target[currentTarget]) {
             byte[] bytes = new byte[i - start];
-            System.arraycopy(str, start, bytes, 0, bytes.length);
+            System.arraycopy(str, start + offset, bytes, 0, bytes.length);
             list.add(bytes);
             currentTarget++;
           } else {
@@ -1491,11 +1487,11 @@ public class Bytes {
     if (match || (preserveAllTokens && lastMatch)) {
       if (target == null) {
         byte[] bytes = new byte[i - start];
-        System.arraycopy(str, start, bytes, 0, bytes.length);
+        System.arraycopy(str, start + offset, bytes, 0, bytes.length);
         list.add(bytes);
       } else if (target.length > currentTarget && currentIndex == target[currentTarget]) {
         byte[] bytes = new byte[i - start];
-        System.arraycopy(str, start, bytes, 0, bytes.length);
+        System.arraycopy(str, start + offset, bytes, 0, bytes.length);
         list.add(bytes); //str.substring(start, i));
         currentTarget++;
       } else {
@@ -1767,4 +1763,16 @@ public class Bytes {
     return toString(b, 0, n);
   }
 
+  public static int readFully(InputStream is, byte[] buffer, int offset, int length)
+      throws IOException {
+    int nread = 0;
+    while (nread < length) {
+      int nbytes = is.read(buffer, offset + nread, length - nread);
+      if (nbytes < 0) {
+        return nread > 0 ? nread : nbytes;
+      }
+      nread += nbytes;
+    }
+    return nread;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index a5228faf..e9b6cfc 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -18,8 +18,6 @@
 
 package org.apache.tajo.storage;
 
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -27,29 +25,26 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.CharDatum;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.compress.CodecPool;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
 import org.apache.tajo.util.Bytes;
 
 import java.io.*;
+import java.util.ArrayList;
 import java.util.Arrays;
 
 public class CSVFile {
-  public static byte[] trueBytes = "true".getBytes();
-  public static byte[] falseBytes = "false".getBytes();
 
   public static final String DELIMITER = "csvfile.delimiter";
   public static final String NULL = "csvfile.null";     //read only
@@ -62,6 +57,7 @@ public class CSVFile {
   public static class CSVAppender extends FileAppender {
     private final TableMeta meta;
     private final Schema schema;
+    private final int columnNum;
     private final FileSystem fs;
     private FSDataOutputStream fos;
     private DataOutputStream outputStream;
@@ -73,7 +69,12 @@ public class CSVFile {
     private CompressionCodec codec;
     private Path compressedPath;
     private byte[] nullChars;
-    private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+    private int BUFFER_SIZE = 128 * 1024;
+    private int bufferedBytes = 0;
+    private long pos = 0;
+
+    private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
+    private TextSerializeDeserialize serializeDeserialize = new TextSerializeDeserialize();
 
     public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
       super(conf, schema, meta, path);
@@ -81,7 +82,7 @@ public class CSVFile {
       this.meta = meta;
       this.schema = schema;
       this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(DELIMITER, DELIMITER_DEFAULT)).charAt(0);
-
+      this.columnNum = schema.getColumnNum();
       String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(NULL));
       if (StringUtils.isEmpty(nullCharacters)) {
         nullChars = NullDatum.get().asTextBytes();
@@ -119,177 +120,89 @@ public class CSVFile {
           throw new AlreadyExistsStorageException(path);
         }
         fos = fs.create(path);
-        outputStream = fos;
+        outputStream = new DataOutputStream(new BufferedOutputStream(fos));
       }
 
       if (enabledStats) {
         this.stats = new TableStatistics(this.schema);
       }
 
+      os.reset();
+      pos = fos.getPos();
+      bufferedBytes = 0;
       super.init();
     }
 
+
     @Override
     public void addTuple(Tuple tuple) throws IOException {
-      Column col;
       Datum datum;
+      int rowBytes = 0;
 
-      int colNum = schema.getColumnNum();
-      if (tuple instanceof LazyTuple) {
-        LazyTuple  lTuple = (LazyTuple)tuple;
-        for (int i = 0; i < colNum; i++) {
-          TajoDataTypes.DataType dataType = schema.getColumn(i).getDataType();
-
-          switch (dataType.getType()) {
-            case TEXT: {
-              datum = tuple.get(i);
-              if (datum instanceof NullDatum) {
-                outputStream.write(nullChars);
-              } else {
-                outputStream.write(datum.asTextBytes());
-              }
-              break;
-            }
-            case CHAR: {
-              datum = tuple.get(i);
-              if (datum instanceof NullDatum) {
-                outputStream.write(nullChars);
-              } else {
-                byte[] pad = new byte[dataType.getLength() - datum.size()];
-                outputStream.write(datum.asTextBytes());
-                outputStream.write(pad);
-              }
-              break;
-            }
-            case BOOLEAN: {
-              datum = tuple.get(i);
-              if (datum instanceof NullDatum) {
-                //null datum is zero length byte array
-              } else {
-                outputStream.write(datum.asBool() ? trueBytes : falseBytes);   //Compatibility with Apache Hive
-              }
-              break;
-            }
-            case NULL:
-              break;
-            case PROTOBUF:
-              datum = tuple.get(i);
-              ProtobufDatum protobufDatum = (ProtobufDatum) datum;
-              protobufJsonFormat.print(protobufDatum.get(), outputStream);
-              break;
-            default:
-              outputStream.write(lTuple.getTextBytes(i)); //better usage for insertion to table of lazy tuple
-              break;
-          }
-
-          if(colNum - 1 > i){
-            outputStream.write((byte) delimiter);
-          }
+      for (int i = 0; i < columnNum; i++) {
+        datum = tuple.get(i);
+        rowBytes += serializeDeserialize.serialize(schema.getColumn(i), datum, os, nullChars);
 
-          if (enabledStats) {
-            datum = tuple.get(i);
-            stats.analyzeField(i, datum);
-          }
+        if(columnNum - 1 > i){
+          os.write((byte) delimiter);
+          rowBytes += 1;
         }
-      } else {
-        for (int i = 0; i < schema.getColumnNum(); i++) {
-          datum = tuple.get(i);
-          if (enabledStats) {
-            stats.analyzeField(i, datum);
-          }
-          if (datum instanceof NullDatum) {
-            outputStream.write(nullChars);
-          } else {
-            col = schema.getColumn(i);
-            switch (col.getDataType().getType()) {
-              case BOOLEAN:
-                outputStream.write(tuple.getBoolean(i).asBool() ? trueBytes : falseBytes);   //Compatibility with Apache Hive
-                break;
-              case BIT:
-                outputStream.write(tuple.getByte(i).asTextBytes());
-                break;
-              case BLOB:
-                outputStream.write(Base64.encodeBase64(tuple.getBytes(i).asByteArray(), false));
-                break;
-              case CHAR:
-                CharDatum charDatum = tuple.getChar(i);
-                byte[] pad = new byte[col.getDataType().getLength() - datum.size()];
-                outputStream.write(charDatum.asTextBytes());
-                outputStream.write(pad);
-                break;
-              case TEXT:
-                outputStream.write(tuple.getText(i).asTextBytes());
-                break;
-              case INT2:
-                outputStream.write(tuple.getShort(i).asTextBytes());
-                break;
-              case INT4:
-                outputStream.write(tuple.getInt(i).asTextBytes());
-                break;
-              case INT8:
-                outputStream.write(tuple.getLong(i).asTextBytes());
-                break;
-              case FLOAT4:
-                outputStream.write(tuple.getFloat(i).asTextBytes());
-                break;
-              case FLOAT8:
-                outputStream.write(tuple.getDouble(i).asTextBytes());
-                break;
-              case INET4:
-                outputStream.write(tuple.getIPv4(i).asTextBytes());
-                break;
-              case INET6:
-                outputStream.write(tuple.getIPv6(i).toString().getBytes());
-                break;
-              case PROTOBUF:
-                ProtobufDatum protobuf = (ProtobufDatum) datum;
-                ProtobufJsonFormat.getInstance().print(protobuf.get(), outputStream);
-                break;
-              default:
-                throw new UnsupportedOperationException("Cannot write such field: "
-                    + tuple.get(i).type());
-            }
-          }
-          if(colNum - 1 > i){
-            outputStream.write((byte) delimiter);
-          }
+        if (enabledStats) {
+          stats.analyzeField(i, datum);
         }
       }
+      os.write(LF);
+      rowBytes += 1;
+
+      pos += rowBytes;
+      bufferedBytes += rowBytes;
+      if(bufferedBytes > BUFFER_SIZE){
+        flushBuffer();
+      }
       // Statistical section
-      outputStream.write('\n');
       if (enabledStats) {
         stats.incrementRow();
       }
     }
 
+    private void flushBuffer() throws IOException {
+      if(os.getLength() > 0) {
+        os.writeTo(outputStream);
+        os.reset();
+        bufferedBytes = 0;
+      }
+    }
     @Override
     public long getOffset() throws IOException {
-      return fos.getPos();
+      return pos;
     }
 
     @Override
     public void flush() throws IOException {
+      flushBuffer();
       outputStream.flush();
     }
 
     @Override
     public void close() throws IOException {
-      // Statistical section
-      if (enabledStats) {
-        stats.setNumBytes(getOffset());
-      }
 
       try {
         flush();
 
+        // Statistical section
+        if (enabledStats) {
+          stats.setNumBytes(getOffset());
+        }
+
         if(deflateFilter != null) {
           deflateFilter.finish();
           deflateFilter.resetState();
           deflateFilter = null;
         }
 
-        fos.close();
+        os.close();
       } finally {
+        IOUtils.cleanup(LOG, fos);
         if (compressor != null) {
           CodecPool.returnCompressor(compressor);
           compressor = null;
@@ -322,11 +235,10 @@ public class CSVFile {
       factory = new CompressionCodecFactory(conf);
       codec = factory.getCodec(fragment.getPath());
       if (codec == null || codec instanceof SplittableCompressionCodec) {
-          splittable = true;
+        splittable = true;
       }
 
-      // Buffer size, Delimiter
-      this.bufSize = DEFAULT_BUFFER_SIZE;
+      //Delimiter
       String delim  = meta.getOption(DELIMITER, DELIMITER_DEFAULT);
       this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
 
@@ -339,7 +251,6 @@ public class CSVFile {
     }
 
     private final static int DEFAULT_BUFFER_SIZE = 128 * 1024;
-    private int bufSize;
     private char delimiter;
     private FileSystem fs;
     private FSDataInputStream fis;
@@ -349,28 +260,28 @@ public class CSVFile {
     private Decompressor decompressor;
     private Seekable filePosition;
     private boolean splittable = false;
-    private long startOffset, length;
-    private byte[] buf = null;
-    private byte[][] tuples = null;
-    private long[] tupleOffsets = null;
+    private long startOffset, length, end, pos;
     private int currentIdx = 0, validIdx = 0;
-    private byte[] tail = null;
-    private long pageStart = -1;
-    private long prevTailLen = -1;
     private int[] targetColumnIndexes;
     private boolean eof = false;
     private final byte[] nullChars;
+    private LineReader reader;
+    private ArrayList<Long> fileOffsets = new ArrayList<Long>();
+    private ArrayList<Integer> rowLengthList = new ArrayList<Integer>();
+    private ArrayList<Integer> startOffsets = new ArrayList<Integer>();
+    private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
 
     @Override
     public void init() throws IOException {
 
       // FileFragment information
-      fs = fragment.getPath().getFileSystem(conf);
-      fis = fs.open(fragment.getPath());
-      startOffset = fragment.getStartKey();
-      length = fragment.getEndKey();
+      if(fs == null) fs = fragment.getPath().getFileSystem(conf);
+      if(fis == null) fis = fs.open(fragment.getPath());
 
-      if(startOffset > 0) startOffset--; // prev line feed
+      pos = startOffset = fragment.getStartKey();
+      length = fragment.getEndKey();
+      end = startOffset + length;
+      fis.seek(startOffset);
 
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
@@ -385,14 +296,14 @@ public class CSVFile {
           is = cIn;
         } else {
           is = new DataInputStream(codec.createInputStream(fis, decompressor));
+          filePosition = fis;
         }
       } else {
-        fis.seek(startOffset);
         filePosition = fis;
         is = fis;
       }
 
-      tuples = new byte[0][];
+      reader = new LineReader(is, DEFAULT_BUFFER_SIZE);
       if (targets == null) {
         targets = schema.toArray();
       }
@@ -410,103 +321,61 @@ public class CSVFile {
       }
 
       if (startOffset != 0) {
-        int rbyte;
-        while ((rbyte = is.read()) != LF) {
-          if(rbyte == EOF) break;
-        }
-      }
-
-      if (fragmentable() < 1) {
-        close();
-        return;
+        pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
       }
+      eof = false;
       page();
     }
 
+    private int maxBytesToConsume(long pos) {
+      return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
+    }
+
     private long fragmentable() throws IOException {
-      return startOffset + length - getFilePosition();
+      return end - getFilePosition();
     }
 
     private long getFilePosition() throws IOException {
       long retVal;
-      if (filePosition != null) {
+      if (isCompress()) {
         retVal = filePosition.getPos();
       } else {
-        retVal = fis.getPos();
+        retVal = pos;
       }
       return retVal;
     }
 
     private void page() throws IOException {
-      // Index initialization
+//      // Index initialization
       currentIdx = 0;
+      validIdx = 0;
+      int currentBufferPos = 0;
+      int bufferedSize = 0;
 
-      // Buffer size set
-      if (isSplittable() &&  fragmentable() < DEFAULT_BUFFER_SIZE) {
-        bufSize = (int)fragmentable();
-      }
-
-      if (this.tail == null || this.tail.length == 0) {
-        this.pageStart = getFilePosition();
-        this.prevTailLen = 0;
-      } else {
-        this.pageStart = getFilePosition() - this.tail.length;
-        this.prevTailLen = this.tail.length;
-      }
+      buffer.reset();
+      startOffsets.clear();
+      rowLengthList.clear();
+      fileOffsets.clear();
 
-      // Read
-      int rbyte;
-      buf = new byte[bufSize];
-      rbyte = is.read(buf);
+      if(eof) return;
 
-      if (prevTailLen == 0) {
-        if(rbyte == EOF){
-          eof = true; //EOF
-          return;
-        }
+      while (DEFAULT_BUFFER_SIZE > bufferedSize){
 
-        tail = new byte[0];
-        tuples = Bytes.splitPreserveAllTokens(buf, rbyte, (char) LF);
-      } else {
-        byte[] lastRow = ArrayUtils.addAll(tail, buf);
-        tuples = Bytes.splitPreserveAllTokens(lastRow, rbyte + tail.length, (char) LF);
-        tail = null;
-      }
-
-      // Check tail
-      if ((char) buf[rbyte - 1] != LF) {
-        // splittable bzip2 compression returned 1 byte when sync maker found
-        if (isSplittable() && (fragmentable() < 1 || rbyte != bufSize)) {
-          int lineFeedPos = 0;
-          byte[] temp = new byte[DEFAULT_BUFFER_SIZE];
-
-          // find line feed
-          while ((temp[lineFeedPos] = (byte)is.read()) != (byte)LF) {
-            lineFeedPos++;
-          }
-
-          tuples[tuples.length - 1] = ArrayUtils.addAll(tuples[tuples.length - 1],
-              ArrayUtils.subarray(temp, 0, lineFeedPos));
-          validIdx = tuples.length;
+        int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
 
+        if(ret <= 0){
+          eof = true;
+          break;
         } else {
-          tail = tuples[tuples.length - 1];
-          validIdx = tuples.length - 1;
+          fileOffsets.add(pos);
+          pos += ret;
+          startOffsets.add(currentBufferPos);
+          currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
+          bufferedSize += ret;
+          validIdx++;
         }
-      } else {
-        tail = new byte[0];
-        validIdx = tuples.length - 1;   //remove last empty row ( .... \n .... \n  length is 3)
-      }
 
-     if(!isCompress()) makeTupleOffset();
-    }
-
-    private void makeTupleOffset() {
-      long curTupleOffset = 0;
-      this.tupleOffsets = new long[this.validIdx];
-      for (int i = 0; i < this.validIdx; i++) {
-        this.tupleOffsets[i] = curTupleOffset + this.pageStart;
-        curTupleOffset += this.tuples[i].length + 1;//tuple byte +  1byte line feed
+        if(isSplittable() && getFilePosition() > end) break;
       }
     }
 
@@ -514,31 +383,31 @@ public class CSVFile {
     public Tuple next() throws IOException {
       try {
         if (currentIdx == validIdx) {
-          if (isSplittable() && fragmentable() < 1) {
-            close();
+          if (isSplittable() && fragmentable() <= 0) {
             return null;
           } else {
             page();
-          }
 
-          if(eof){
-            close();
-            return null;
+            if(currentIdx == validIdx){
+              return null;
+            }
           }
         }
 
         long offset = -1;
         if(!isCompress()){
-          offset = this.tupleOffsets[currentIdx];
+          offset = fileOffsets.get(currentIdx);
         }
 
-        byte[][] cells = Bytes.splitPreserveAllTokens(tuples[currentIdx++], delimiter, targetColumnIndexes);
+        byte[][] cells = Bytes.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
+            rowLengthList.get(currentIdx),  delimiter, targetColumnIndexes);
+        currentIdx++;
         return new LazyTuple(schema, cells, offset, nullChars);
       } catch (Throwable t) {
-        LOG.error("Tuple list length: " + (tuples != null ? tuples.length : 0), t);
+        LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
         LOG.error("Tuple list current index: " + currentIdx, t);
+        throw new IOException(t);
       }
-      return null;
     }
 
     private boolean isCompress() {
@@ -547,13 +416,20 @@ public class CSVFile {
 
     @Override
     public void reset() throws IOException {
+      if (decompressor != null) {
+        decompressor.reset();
+        CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
+      }
+
       init();
     }
 
     @Override
     public void close() throws IOException {
       try {
-        is.close();
+        IOUtils.cleanup(LOG, is, fis);
+        fs = null;
       } finally {
         if (decompressor != null) {
           decompressor.reset();
@@ -581,26 +457,25 @@ public class CSVFile {
     public void seek(long offset) throws IOException {
       if(isCompress()) throw new UnsupportedException();
 
-      int tupleIndex = Arrays.binarySearch(this.tupleOffsets, offset);
+      int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset);
+
       if (tupleIndex > -1) {
         this.currentIdx = tupleIndex;
-      } else if (isSplittable() && offset >= this.pageStart + this.bufSize
-          + this.prevTailLen - this.tail.length || offset <= this.pageStart) {
-        filePosition.seek(offset);
-        tail = new byte[0];
-        buf = new byte[DEFAULT_BUFFER_SIZE];
-        bufSize = DEFAULT_BUFFER_SIZE;
+      } else if (isSplittable() && end >= offset || startOffset <= offset) {
+        eof = false;
+        fis.seek(offset);
+        pos = offset;
+        reader.reset();
         this.currentIdx = 0;
         this.validIdx = 0;
         // pageBuffer();
       } else {
         throw new IOException("invalid offset " +
-            " < pageStart : " +  this.pageStart + " , " +
-            "  pagelength : " + this.bufSize + " , " +
-            "  tail lenght : " + this.tail.length +
+            " < start : " +  startOffset + " , " +
+            "  end : " + end + " , " +
+            "  filePos : " + filePosition.getPos() + " , " +
             "  input offset : " + offset + " >");
       }
-
     }
 
     @Override
@@ -608,13 +483,14 @@ public class CSVFile {
       if(isCompress()) throw new UnsupportedException();
 
       if (this.currentIdx == this.validIdx) {
-        if (fragmentable() < 1) {
+        if (fragmentable() <= 0) {
           return -1;
         } else {
           page();
+          if(currentIdx == validIdx) return -1;
         }
       }
-      return this.tupleOffsets[currentIdx];
+      return fileOffsets.get(currentIdx);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index a395ea4..50bc65c 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -18,14 +18,9 @@
 
 package org.apache.tajo.storage;
 
-import com.google.protobuf.Message;
-import org.apache.commons.codec.binary.Base64;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.datum.exception.InvalidCastException;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.util.Bytes;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -37,6 +32,7 @@ public class LazyTuple implements Tuple {
   private byte[][] textBytes;
   private Schema schema;
   private byte[] nullBytes;
+  private static TextSerializeDeserialize serializeDeserialize = new TextSerializeDeserialize();
 
   public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
     this(schema, textBytes, offset, NullDatum.get().asTextBytes());
@@ -123,7 +119,12 @@ public class LazyTuple implements Tuple {
     else if (textBytes.length <= fieldId) {
       values[fieldId] = NullDatum.get();  // split error. (col : 3, separator: ',', row text: "a,")
     } else if (textBytes[fieldId] != null) {
-      values[fieldId] = createByTextBytes(schema.getColumn(fieldId).getDataType(), textBytes[fieldId]);
+      try {
+        values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
+            textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
+      } catch (IOException e) {
+        values[fieldId] = NullDatum.get();
+      }
       textBytes[fieldId] = null;
     } else {
       //non-projection
@@ -285,67 +286,4 @@ public class LazyTuple implements Tuple {
     }
     return false;
   }
-
-
-  public  boolean isNull(byte[] val){
-    return val == null || val.length == 0 || ((val.length == nullBytes.length) && Bytes.equals(val, nullBytes));
-  }
-
-  public  boolean isNullText(byte[] val){
-    return val == null || (val.length > 0 && val.length == nullBytes.length && Bytes.equals(val, nullBytes));
-  }
-
-  public boolean isNotNull(byte[] val){
-    return !isNull(val);
-  }
-
-  public boolean isNotNullText(byte[] val){
-    return !isNullText((val));
-  }
-
-  private  Datum createByTextBytes(TajoDataTypes.DataType type, byte [] val) {
-    switch (type.getType()) {
-      case BOOLEAN:
-        return isNotNull(val) ? DatumFactory.createBool(val[0] == 't' || val[0] == 'T') : NullDatum.get();
-      case INT2:
-        return isNotNull(val) ? DatumFactory.createInt2(new String(val)) : NullDatum.get();
-      case INT4:
-        return isNotNull(val) ? DatumFactory.createInt4(new String(val)) : NullDatum.get();
-      case INT8:
-        return isNotNull(val) ? DatumFactory.createInt8(new String(val)) : NullDatum.get();
-      case FLOAT4:
-        return isNotNull(val) ? DatumFactory.createFloat4(new String(val)) : NullDatum.get();
-      case FLOAT8:
-        return isNotNull(val) ? DatumFactory.createFloat8(new String(val)) : NullDatum.get();
-      case CHAR:
-        return isNotNullText(val) ? DatumFactory.createChar(new String(val).trim()) : NullDatum.get();
-      case TEXT:
-        return isNotNullText(val) ? DatumFactory.createText(val) : NullDatum.get();
-      case BIT:
-        return DatumFactory.createBit(Byte.parseByte(new String(val)));
-      case BLOB:
-        return DatumFactory.createBlob(Base64.decodeBase64(val));
-      case INET4:
-        return isNotNull(val) ? DatumFactory.createInet4(new String(val)) : NullDatum.get();
-      case PROTOBUF: {
-        if (isNotNull(val)) {
-          ProtobufDatumFactory factory = ProtobufDatumFactory.get(type);
-          Message.Builder builder = factory.newBuilder();
-          try {
-            ProtobufJsonFormat.getInstance().merge(val, builder);
-            return factory.createDatum(builder.build());
-          } catch (IOException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-        } else {
-          return NullDatum.get();
-        }
-      }
-      case NULL:
-        return NullDatum.get();
-      default:
-        throw new UnsupportedOperationException(type.toString());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
new file mode 100644
index 0000000..f48c482
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+/**
+ * A class that provides a line reader from an input stream.
+ * Depending on the constructor used, lines will either be terminated by:
+ * <ul>
+ * <li>one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF).</li>
+ * <li><em>or</em>, a custom byte sequence delimiter</li>
+ * </ul>
+ * In both cases, EOF also terminates an otherwise unterminated
+ * line.
+ */
+
+public class LineReader implements Closeable {
+  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+  private int bufferSize = DEFAULT_BUFFER_SIZE;
+  private InputStream in;
+  private byte[] buffer;
+  // the number of bytes of real data in the buffer
+  private int bufferLength = 0;
+  // the current position in the buffer
+  private int bufferPosn = 0;
+
+  private static final byte CR = '\r';
+  private static final byte LF = '\n';
+
+  // The line delimiter
+  private final byte[] recordDelimiterBytes;
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * default buffer-size (64k).
+   *
+   * @param in The input stream
+   * @throws IOException
+   */
+  public LineReader(InputStream in) {
+    this(in, DEFAULT_BUFFER_SIZE);
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * given buffer-size.
+   *
+   * @param in         The input stream
+   * @param bufferSize Size of the read buffer
+   * @throws IOException
+   */
+  public LineReader(InputStream in, int bufferSize) {
+    this.in = in;
+    this.bufferSize = bufferSize;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = null;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * <code>io.file.buffer.size</code> specified in the given
+   * <code>Configuration</code>.
+   *
+   * @param in   input stream
+   * @param conf configuration
+   * @throws IOException
+   */
+  public LineReader(InputStream in, Configuration conf) throws IOException {
+    this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * default buffer-size, and using a custom delimiter of array of
+   * bytes.
+   *
+   * @param in                   The input stream
+   * @param recordDelimiterBytes The delimiter
+   */
+  public LineReader(InputStream in, byte[] recordDelimiterBytes) {
+    this.in = in;
+    this.bufferSize = DEFAULT_BUFFER_SIZE;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * given buffer-size, and using a custom delimiter of array of
+   * bytes.
+   *
+   * @param in                   The input stream
+   * @param bufferSize           Size of the read buffer
+   * @param recordDelimiterBytes The delimiter
+   * @throws IOException
+   */
+  public LineReader(InputStream in, int bufferSize,
+                    byte[] recordDelimiterBytes) {
+    this.in = in;
+    this.bufferSize = bufferSize;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * <code>io.file.buffer.size</code> specified in the given
+   * <code>Configuration</code>, and using a custom delimiter of array of
+   * bytes.
+   *
+   * @param in                   input stream
+   * @param conf                 configuration
+   * @param recordDelimiterBytes The delimiter
+   * @throws IOException
+   */
+  public LineReader(InputStream in, Configuration conf,
+                    byte[] recordDelimiterBytes) throws IOException {
+    this.in = in;
+    this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+
+  /**
+   * Close the underlying stream.
+   *
+   * @throws IOException
+   */
+  public void close() throws IOException {
+    in.close();
+  }
+
+  public void reset() {
+    bufferLength = 0;
+    bufferPosn = 0;
+
+  }
+
+  /**
+   * Read one line from the InputStream into the given Text.
+   *
+   * @param str               the object to store the given line (without newline)
+   * @param maxLineLength     the maximum number of bytes to store into str;
+   *                          the rest of the line is silently discarded.
+   * @param maxBytesToConsume the maximum number of bytes to consume
+   *                          in this call.  This is only a hint, because if the line cross
+   *                          this threshold, we allow it to happen.  It can overshoot
+   *                          potentially by as much as one buffer length.
+   * @return the number of bytes read including the (longest) newline
+   *         found.
+   * @throws IOException if the underlying stream throws
+   */
+  public int readLine(Text str, int maxLineLength,
+                      int maxBytesToConsume) throws IOException {
+    if (this.recordDelimiterBytes != null) {
+      return readCustomLine(str, maxLineLength, maxBytesToConsume);
+    } else {
+      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
+    }
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    /* We're reading data from in, but the head of the stream may be
+     * already buffered in buffer, so we have several cases:
+     * 1. No newline characters are in the buffer, so we need to copy
+     *    everything and read another buffer from the stream.
+     * 2. An unambiguously terminated line is in buffer, so we just
+     *    copy to str.
+     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+     *    in CR.  In this case we copy everything up to CR to str, but
+     *    we also need to see what follows CR: if it's LF, then we
+     *    need consume LF as well, so next call to readLine will read
+     *    from after that.
+     * We use a flag prevCharCR to signal if previous character was CR
+     * and, if it happens to be at the end of the buffer, delay
+     * consuming it until we have a chance to look at the char that
+     * follows.
+     */
+    str.clear();
+    int txtLength = 0; //tracks str.getLength(), as an optimization
+    int newlineLength = 0; //length of terminating newline
+    boolean prevCharCR = false; //true of prev char was CR
+    long bytesConsumed = 0;
+    do {
+      int startPosn = bufferPosn; //starting from where we left off the last time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        if (prevCharCR) {
+          ++bytesConsumed; //account for CR from previous read
+        }
+        bufferLength = in.read(buffer);
+        if (bufferLength <= 0) {
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+        if (buffer[bufferPosn] == LF) {
+          newlineLength = (prevCharCR) ? 2 : 1;
+          ++bufferPosn; // at next invocation proceed from following byte
+          break;
+        }
+        if (prevCharCR) { //CR + notLF, we are at notLF
+          newlineLength = 1;
+          break;
+        }
+        prevCharCR = (buffer[bufferPosn] == CR);
+      }
+      int readLength = bufferPosn - startPosn;
+      if (prevCharCR && newlineLength == 0) {
+        --readLength; //CR at the end of the buffer
+      }
+      bytesConsumed += readLength;
+      int appendLength = readLength - newlineLength;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+    if (bytesConsumed > (long) Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes before newline: " + bytesConsumed);
+    }
+    return (int) bytesConsumed;
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+  public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
+      , int maxBytesToConsume)
+      throws IOException {
+    /* We're reading data from in, but the head of the stream may be
+     * already buffered in buffer, so we have several cases:
+     * 1. No newline characters are in the buffer, so we need to copy
+     *    everything and read another buffer from the stream.
+     * 2. An unambiguously terminated line is in buffer, so we just
+     *    copy to str.
+     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+     *    in CR.  In this case we copy everything up to CR to str, but
+     *    we also need to see what follows CR: if it's LF, then we
+     *    need consume LF as well, so next call to readLine will read
+     *    from after that.
+     * We use a flag prevCharCR to signal if previous character was CR
+     * and, if it happens to be at the end of the buffer, delay
+     * consuming it until we have a chance to look at the char that
+     * follows.
+     */
+
+    int txtLength = 0; //tracks str.getLength(), as an optimization
+    int newlineLength = 0; //length of terminating newline
+    boolean prevCharCR = false; //true of prev char was CR
+    long bytesConsumed = 0;
+    do {
+      int startPosn = bufferPosn; //starting from where we left off the last time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        if (prevCharCR) {
+          ++bytesConsumed; //account for CR from previous read
+        }
+        bufferLength = in.read(buffer);
+        if (bufferLength <= 0) {
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+        if (buffer[bufferPosn] == LF) {
+          newlineLength = (prevCharCR) ? 2 : 1;
+          ++bufferPosn; // at next invocation proceed from following byte
+          break;
+        }
+        if (prevCharCR) { //CR + notLF, we are at notLF
+          newlineLength = 1;
+          break;
+        }
+        prevCharCR = (buffer[bufferPosn] == CR);
+      }
+      int readLength = bufferPosn - startPosn;
+      if (prevCharCR && newlineLength == 0) {
+        --readLength; //CR at the end of the buffer
+      }
+      bytesConsumed += readLength;
+      int appendLength = readLength - newlineLength;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        str.write(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+    if (bytesConsumed > (long) Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes before newline: " + bytesConsumed);
+    }
+
+    if (bytesConsumed > 0) offsets.add(txtLength);
+    return (int) bytesConsumed;
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+
+/*  int validIdx = 0;
+  public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets,
+                             long pos, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    *//* We're reading data from in, but the head of the stream may be
+     * already buffered in buffer, so we have several cases:
+     * 1. No newline characters are in the buffer, so we need to copy
+     *    everything and read another buffer from the stream.
+     * 2. An unambiguously terminated line is in buffer, so we just
+     *    copy to str.
+     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+     *    in CR.  In this case we copy everything up to CR to str, but
+     *    we also need to see what follows CR: if it's LF, then we
+     *    need consume LF as well, so next call to readLine will read
+     *    from after that.
+     * We use a flag prevCharCR to signal if previous character was CR
+     * and, if it happens to be at the end of the buffer, delay
+     * consuming it until we have a chance to look at the char that
+     * follows.
+     *//*
+    //str.clear();
+    str.reset();
+    offsets.clear();
+    foffsets.clear();
+
+    validIdx = 0;
+    long bufferBytesConsumed = 0;
+
+    int txtLength = 0; //tracks str.getLength(), as an optimization
+    int newlineLength = 0; //length of terminating newline
+    boolean prevCharCR = false; //true of prev char was CR
+    long bytesConsumed = 0;
+    do {
+
+      int startPosn = bufferPosn; //starting from where we left off the last time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        if (prevCharCR) {
+          ++bytesConsumed; //account for CR from previous read
+        }
+        bufferLength = in.read(buffer);
+        if (bufferLength <= 0) {
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+        if (buffer[bufferPosn] == LF) {
+          newlineLength = (prevCharCR) ? 2 : 1;
+          ++bufferPosn; // at next invocation proceed from following byte
+          break;
+        }
+        if (prevCharCR) { //CR + notLF, we are at notLF
+          newlineLength = 1;
+          break;
+        }
+        prevCharCR = (buffer[bufferPosn] == CR);
+      }
+      int readLength = bufferPosn - startPosn;
+      if (prevCharCR && newlineLength == 0) {
+        --readLength; //CR at the end of the buffer
+      }
+      bytesConsumed += readLength;
+      int appendLength = readLength - newlineLength;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+
+      if (appendLength > 0) {
+        str.write(buffer, startPosn, appendLength);
+        //System.out.println(startPosn + "," + appendLength);
+        //str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+
+      if(newlineLength > 0){
+        validIdx++;
+
+        if (bytesConsumed > (long)Integer.MAX_VALUE) {
+          throw new IOException("Too many bytes before newline: " + bytesConsumed);
+        }
+        offsets.add(txtLength);
+        foffsets.add(pos);
+        pos+= bytesConsumed;
+        bufferBytesConsumed += bytesConsumed;
+
+        txtLength = 0;
+        newlineLength = 0;
+        prevCharCR = false; //true of prev char was CR
+        bytesConsumed = 0;
+      } else {
+        bufferBytesConsumed += bytesConsumed;
+        bytesConsumed = 0;
+      }
+    } while ((bufferBytesConsumed < 256 * 1024));
+
+    return (int)bufferBytesConsumed;
+  }*/
+
+  /**
+   * Read a line terminated by a custom delimiter.
+   */
+  private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+   /* We're reading data from inputStream, but the head of the stream may be
+    *  already captured in the previous buffer, so we have several cases:
+    *
+    * 1. The buffer tail does not contain any character sequence which
+    *    matches with the head of delimiter. We count it as a
+    *    ambiguous byte count = 0
+    *
+    * 2. The buffer tail contains a X number of characters,
+    *    that forms a sequence, which matches with the
+    *    head of delimiter. We count ambiguous byte count = X
+    *
+    *    // ***  eg: A segment of input file is as follows
+    *
+    *    " record 1792: I found this bug very interesting and
+    *     I have completely read about it. record 1793: This bug
+    *     can be solved easily record 1794: This ."
+    *
+    *    delimiter = "record";
+    *
+    *    supposing:- String at the end of buffer =
+    *    "I found this bug very interesting and I have completely re"
+    *    There for next buffer = "ad about it. record 179       ...."
+    *
+    *     The matching characters in the input
+    *     buffer tail and delimiter head = "re"
+    *     Therefore, ambiguous byte count = 2 ****   //
+    *
+    *     2.1 If the following bytes are the remaining characters of
+    *         the delimiter, then we have to capture only up to the starting
+    *         position of delimiter. That means, we need not include the
+    *         ambiguous characters in str.
+    *
+    *     2.2 If the following bytes are not the remaining characters of
+    *         the delimiter ( as mentioned in the example ),
+    *         then we have to include the ambiguous characters in str.
+    */
+    str.clear();
+    int txtLength = 0; // tracks str.getLength(), as an optimization
+    long bytesConsumed = 0;
+    int delPosn = 0;
+    int ambiguousByteCount = 0; // To capture the ambiguous characters count
+    do {
+      int startPosn = bufferPosn; // Start from previous end position
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        bufferLength = in.read(buffer);
+        if (bufferLength <= 0) {
+          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) {
+        if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
+          delPosn++;
+          if (delPosn >= recordDelimiterBytes.length) {
+            bufferPosn++;
+            break;
+          }
+        } else if (delPosn != 0) {
+          bufferPosn--;
+          delPosn = 0;
+        }
+      }
+      int readLength = bufferPosn - startPosn;
+      bytesConsumed += readLength;
+      int appendLength = readLength - delPosn;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        if (ambiguousByteCount > 0) {
+          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+          //appending the ambiguous characters (refer case 2.2)
+          bytesConsumed += ambiguousByteCount;
+          ambiguousByteCount = 0;
+        }
+        str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+      if (bufferPosn >= bufferLength) {
+        if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
+          ambiguousByteCount = delPosn;
+          bytesConsumed -= ambiguousByteCount; //to be consumed in next
+        }
+      }
+    } while (delPosn < recordDelimiterBytes.length
+        && bytesConsumed < maxBytesToConsume);
+    if (bytesConsumed > (long) Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
+    }
+    return (int) bytesConsumed;
+  }
+
+  /**
+   * Read from the InputStream into the given Text.
+   *
+   * @param str           the object to store the given line
+   * @param maxLineLength the maximum number of bytes to store into str.
+   * @return the number of bytes read including the newline
+   * @throws IOException if the underlying stream throws
+   */
+  public int readLine(Text str, int maxLineLength) throws IOException {
+    return readLine(str, maxLineLength, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Read from the InputStream into the given Text.
+   *
+   * @param str the object to store the given line
+   * @return the number of bytes read including the newline
+   * @throws IOException if the underlying stream throws
+   */
+  public int readLine(Text str) throws IOException {
+    return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
index f7e92a2..64b078c 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
@@ -37,8 +37,7 @@ public class TextSerializeDeserialize implements SerializeDeserialize {
 
 
   @Override
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
-      throws IOException {
+  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
 
     byte[] bytes;
     int length = 0;
@@ -178,6 +177,7 @@ public class TextSerializeDeserialize implements SerializeDeserialize {
       }
       default:
         datum = NullDatum.get();
+        break;
     }
     return datum;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/b8435e7c/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 3c47c10..08a181a 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -228,6 +228,6 @@ public class TestCompressionStorages {
       tupleCnt++;
     }
     scanner.close();
-    assertEquals(tupleCnt, tupleNum);
+    assertEquals(tupleNum, tupleCnt);
   }
 }


[3/3] git commit: Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/incubator-tajo into DAG-execplan

Posted by ji...@apache.org.
Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/incubator-tajo into DAG-execplan


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/6532324b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/6532324b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/6532324b

Branch: refs/heads/DAG-execplan
Commit: 6532324b046f15a0a120b53c1ccd6369b97069d9
Parents: 7c97735 1f80cd2
Author: Jihoon Son <ji...@apache.org>
Authored: Thu Nov 28 18:07:06 2013 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Thu Nov 28 18:07:06 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 .../main/java/org/apache/tajo/util/Bytes.java   |  46 +-
 .../tajo/engine/function/string/Length.java     |  51 ++
 .../java/org/apache/tajo/master/TajoMaster.java |   5 +
 .../TestStringOperatorsAndFunctions.java        |  12 +
 .../java/org/apache/tajo/storage/CSVFile.java   | 378 +++++--------
 .../java/org/apache/tajo/storage/LazyTuple.java |  76 +--
 .../org/apache/tajo/storage/LineReader.java     | 555 +++++++++++++++++++
 .../tajo/storage/TextSerializeDeserialize.java  |   4 +-
 .../tajo/storage/TestCompressionStorages.java   |   2 +-
 10 files changed, 791 insertions(+), 342 deletions(-)
----------------------------------------------------------------------



[2/3] git commit: TAJO-308: Implement length(string) function. (hyoungjunkim via hyunsik)

Posted by ji...@apache.org.
TAJO-308: Implement length(string) function. (hyoungjunkim via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/1f80cd24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/1f80cd24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/1f80cd24

Branch: refs/heads/DAG-execplan
Commit: 1f80cd24b7d1005aa6874143c827bde72d55da84
Parents: b8435e7
Author: Hyunsik Choi <hy...@apache.org>
Authored: Thu Nov 28 14:01:00 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Nov 28 14:01:00 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../tajo/engine/function/string/Length.java     | 51 ++++++++++++++++++++
 .../java/org/apache/tajo/master/TajoMaster.java |  5 ++
 .../TestStringOperatorsAndFunctions.java        | 12 +++++
 4 files changed, 70 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1f80cd24/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f60ee90..0a7dd47 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.8.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-308: Implement length(string) function. (hyoungjunkim via hyunsik)
+
     TAJO-200: RCFile compatible to apache hive. (jinho)
 
     TAJO-176: Implement Tajo JDBC Driver. (Keuntae Park via jihoon)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1f80cd24/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/string/Length.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/string/Length.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/string/Length.java
new file mode 100644
index 0000000..5c8b689
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/string/Length.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.string;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.function.GeneralFunction;
+import org.apache.tajo.storage.Tuple;
+
+/**
+ * Function definition
+ *
+ * INT4 length(string text)
+ */
+public class Length  extends GeneralFunction {
+
+  public Length() {
+    super(new Column[] {
+        new Column("text", TajoDataTypes.Type.TEXT)
+    });
+  }
+
+  @Override
+  public Datum eval(Tuple params) {
+    Datum datum = params.get(0);
+    if(datum instanceof NullDatum) {
+      return NullDatum.get();
+    }
+
+    return DatumFactory.createInt4(datum.asChars().length());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1f80cd24/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
index f4118bb..b634f25 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -394,6 +394,11 @@ public class TajoMaster extends CompositeService {
             CatalogUtil.newSimpleDataType(Type.TEXT),
             CatalogUtil.newSimpleDataTypeArray(Type.TEXT, Type.TEXT, Type.TEXT)));
 
+    sqlFuncs.add(
+        new FunctionDesc("length", Length.class, FunctionType.GENERAL,
+            CatalogUtil.newSimpleDataType(Type.INT4),
+            CatalogUtil.newSimpleDataTypeArray(Type.TEXT)));
+
     return sqlFuncs;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/1f80cd24/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
index 8d259ab..5124173 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/function/TestStringOperatorsAndFunctions.java
@@ -178,4 +178,16 @@ public class TestStringOperatorsAndFunctions extends ExprTestBase {
     testEval(schema, "table1", "ABC,DEF,3.14", "select character_length(lower(col1) || lower(col2)) from table1",
         new String[]{"6"});
   }
+
+  @Test
+  public void testLength() throws IOException {
+    testSimpleEval("select length('123456') as col1 ", new String[]{"6"});
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", TEXT);
+    schema.addColumn("col2", TEXT);
+    schema.addColumn("col3", TEXT);
+    testEval(schema, "table1", "ABC,DEF,3.14", "select length(lower(col1) || lower(col2)) from table1",
+        new String[]{"6"});
+  }
 }