You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/12/18 07:03:05 UTC

git commit: TAJO-424: Make serializer/deserializer configurable in CSVFile. (jinho)

Updated Branches:
  refs/heads/master 775cfaaad -> 2c53ccc11


TAJO-424: Make serializer/deserializer configurable in CSVFile. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/2c53ccc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/2c53ccc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/2c53ccc1

Branch: refs/heads/master
Commit: 2c53ccc11ffc94bbbddfaa4fe1831b12405ff229
Parents: 775cfaa
Author: jinossy <ji...@gmail.com>
Authored: Wed Dec 18 15:01:24 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Wed Dec 18 15:01:24 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../storage/BinarySerializeDeserialize.java     | 257 -------------------
 .../storage/BinarySerializerDeserializer.java   | 257 +++++++++++++++++++
 .../java/org/apache/tajo/storage/CSVFile.java   |  77 ++++--
 .../tajo/storage/CompressedSplitLineReader.java | 182 +++++++++++++
 .../java/org/apache/tajo/storage/LazyTuple.java |   7 +-
 .../org/apache/tajo/storage/LineReader.java     |  10 +-
 .../tajo/storage/SerializeDeserialize.java      |  34 ---
 .../tajo/storage/SerializerDeserializer.java    |  34 +++
 .../apache/tajo/storage/SplitLineReader.java    |  39 +++
 .../tajo/storage/TextSerializeDeserialize.java  | 204 ---------------
 .../storage/TextSerializerDeserializer.java     | 204 +++++++++++++++
 .../org/apache/tajo/storage/rcfile/RCFile.java  |  12 +-
 .../apache/tajo/storage/v2/RCFileScanner.java   |  11 +-
 .../tajo/storage/TestCompressionStorages.java   |   2 +-
 .../org/apache/tajo/storage/TestLazyTuple.java  |   4 +-
 .../org/apache/tajo/storage/TestStorages.java   |   4 +-
 17 files changed, 797 insertions(+), 543 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 341787c..83f39ab 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-424: Make serializer/deserializer configurable in CSVFile. (jinho)
+
     TAJO-419: Add missing visitor methods of AlgebraVisitor and
     BaseAlgebraVisitor. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializeDeserialize.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializeDeserialize.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializeDeserialize.java
deleted file mode 100644
index 27de655..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializeDeserialize.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.Message;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-public class BinarySerializeDeserialize implements SerializeDeserialize {
-
-  static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
-
-  @Override
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
-      throws IOException {
-    byte[] bytes;
-    int length = 0;
-    if (datum == null || datum instanceof NullDatum) {
-      return 0;
-    }
-
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-      case BIT:
-      case CHAR:
-        bytes = datum.asByteArray();
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case INT2:
-        length = writeShort(out, datum.asInt2());
-        break;
-      case INT4:
-        length = writeVLong(out, datum.asInt4());
-        break;
-      case INT8:
-        length = writeVLong(out, datum.asInt8());
-        break;
-      case FLOAT4:
-        length = writeFloat(out, datum.asFloat4());
-        break;
-      case FLOAT8:
-        length = writeDouble(out, datum.asFloat8());
-        break;
-      case TEXT: {
-        bytes = datum.asTextBytes();
-        length = datum.size();
-        if (length == 0) {
-          bytes = INVALID_UTF__SINGLE_BYTE;
-          length = INVALID_UTF__SINGLE_BYTE.length;
-        }
-        out.write(bytes, 0, bytes.length);
-        break;
-      }
-      case BLOB:
-      case INET4:
-      case INET6:
-        bytes = datum.asByteArray();
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case PROTOBUF:
-        ProtobufDatum protobufDatum = (ProtobufDatum) datum;
-        bytes = protobufDatum.asByteArray();
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case NULL_TYPE:
-        break;
-      default:
-        throw new IOException("Does not support type");
-    }
-    return length;
-  }
-
-  @Override
-  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
-    if (length == 0) return NullDatum.get();
-
-    Datum datum;
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-        datum = DatumFactory.createBool(bytes[offset]);
-        break;
-      case BIT:
-        datum = DatumFactory.createBit(bytes[offset]);
-        break;
-      case CHAR: {
-        byte[] chars = new byte[length];
-        System.arraycopy(bytes, offset, chars, 0, length);
-        datum = DatumFactory.createChar(chars);
-        break;
-      }
-      case INT2:
-        datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
-        break;
-      case INT4:
-        datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
-        break;
-      case INT8:
-        datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
-        break;
-      case FLOAT4:
-        datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
-        break;
-      case FLOAT8:
-        datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
-        break;
-      case TEXT: {
-        byte[] chars = new byte[length];
-        System.arraycopy(bytes, offset, chars, 0, length);
-
-        if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
-          datum = DatumFactory.createText(new byte[0]);
-        } else {
-          datum = DatumFactory.createText(chars);
-        }
-        break;
-      }
-      case PROTOBUF: {
-        ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
-        Message.Builder builder = factory.newBuilder();
-        builder.mergeFrom(bytes, offset, length);
-        datum = factory.createDatum(builder);
-        break;
-      }
-      case INET4:
-        datum = DatumFactory.createInet4(bytes, offset, length);
-        break;
-      case BLOB:
-        datum = DatumFactory.createBlob(bytes, offset, length);
-        break;
-      default:
-        datum = NullDatum.get();
-    }
-    return datum;
-  }
-
-  private byte[] shortBytes = new byte[2];
-
-  public int writeShort(OutputStream out, short val) throws IOException {
-    shortBytes[0] = (byte) (val >> 8);
-    shortBytes[1] = (byte) val;
-    out.write(shortBytes, 0, 2);
-    return 2;
-  }
-
-  public float toFloat(byte[] bytes, int offset, int length) {
-    Preconditions.checkArgument(length == 4);
-
-    int val = ((bytes[offset] & 0x000000FF) << 24) +
-        ((bytes[offset + 1] & 0x000000FF) << 16) +
-        ((bytes[offset + 2] & 0x000000FF) << 8) +
-        (bytes[offset + 3] & 0x000000FF);
-    return Float.intBitsToFloat(val);
-  }
-
-  private byte[] floatBytes = new byte[4];
-
-  public int writeFloat(OutputStream out, float f) throws IOException {
-    int val = Float.floatToIntBits(f);
-
-    floatBytes[0] = (byte) (val >> 24);
-    floatBytes[1] = (byte) (val >> 16);
-    floatBytes[2] = (byte) (val >> 8);
-    floatBytes[3] = (byte) val;
-    out.write(floatBytes, 0, 4);
-    return floatBytes.length;
-  }
-
-  public double toDouble(byte[] bytes, int offset, int length) {
-    Preconditions.checkArgument(length == 8);
-    long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
-        ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
-        ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
-        ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
-        ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
-        ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
-        ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
-        (long) (bytes[offset + 7] & 0x00000000000000FF);
-    return Double.longBitsToDouble(val);
-  }
-
-  private byte[] doubleBytes = new byte[8];
-
-  public int writeDouble(OutputStream out, double d) throws IOException {
-    long val = Double.doubleToLongBits(d);
-
-    doubleBytes[0] = (byte) (val >> 56);
-    doubleBytes[1] = (byte) (val >> 48);
-    doubleBytes[2] = (byte) (val >> 40);
-    doubleBytes[3] = (byte) (val >> 32);
-    doubleBytes[4] = (byte) (val >> 24);
-    doubleBytes[5] = (byte) (val >> 16);
-    doubleBytes[6] = (byte) (val >> 8);
-    doubleBytes[7] = (byte) val;
-    out.write(doubleBytes, 0, 8);
-    return doubleBytes.length;
-  }
-
-  private byte[] vLongBytes = new byte[9];
-
-  public int writeVLongToByteArray(byte[] bytes, int offset, long l) {
-    if (l >= -112 && l <= 127) {
-      bytes[offset] = (byte) l;
-      return 1;
-    }
-
-    int len = -112;
-    if (l < 0) {
-      l ^= -1L; // take one's complement'
-      len = -120;
-    }
-
-    long tmp = l;
-    while (tmp != 0) {
-      tmp = tmp >> 8;
-      len--;
-    }
-
-    bytes[offset++] = (byte) len;
-    len = (len < -120) ? -(len + 120) : -(len + 112);
-
-    for (int idx = len; idx != 0; idx--) {
-      int shiftbits = (idx - 1) * 8;
-      bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
-    }
-    return 1 + len;
-  }
-
-  public int writeVLong(OutputStream out, long l) throws IOException {
-    int len = writeVLongToByteArray(vLongBytes, 0, l);
-    out.write(vLongBytes, 0, len);
-    return len;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
new file mode 100644
index 0000000..ed034be
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.Message;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class BinarySerializerDeserializer implements SerializerDeserializer {
+
+  static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
+
+  @Override
+  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
+      throws IOException {
+    byte[] bytes;
+    int length = 0;
+    if (datum == null || datum instanceof NullDatum) {
+      return 0;
+    }
+
+    switch (col.getDataType().getType()) {
+      case BOOLEAN:
+      case BIT:
+      case CHAR:
+        bytes = datum.asByteArray();
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case INT2:
+        length = writeShort(out, datum.asInt2());
+        break;
+      case INT4:
+        length = writeVLong(out, datum.asInt4());
+        break;
+      case INT8:
+        length = writeVLong(out, datum.asInt8());
+        break;
+      case FLOAT4:
+        length = writeFloat(out, datum.asFloat4());
+        break;
+      case FLOAT8:
+        length = writeDouble(out, datum.asFloat8());
+        break;
+      case TEXT: {
+        bytes = datum.asTextBytes();
+        length = datum.size();
+        if (length == 0) {
+          bytes = INVALID_UTF__SINGLE_BYTE;
+          length = INVALID_UTF__SINGLE_BYTE.length;
+        }
+        out.write(bytes, 0, bytes.length);
+        break;
+      }
+      case BLOB:
+      case INET4:
+      case INET6:
+        bytes = datum.asByteArray();
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case PROTOBUF:
+        ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+        bytes = protobufDatum.asByteArray();
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case NULL_TYPE:
+        break;
+      default:
+        throw new IOException("Does not support type");
+    }
+    return length;
+  }
+
+  @Override
+  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+    if (length == 0) return NullDatum.get();
+
+    Datum datum;
+    switch (col.getDataType().getType()) {
+      case BOOLEAN:
+        datum = DatumFactory.createBool(bytes[offset]);
+        break;
+      case BIT:
+        datum = DatumFactory.createBit(bytes[offset]);
+        break;
+      case CHAR: {
+        byte[] chars = new byte[length];
+        System.arraycopy(bytes, offset, chars, 0, length);
+        datum = DatumFactory.createChar(chars);
+        break;
+      }
+      case INT2:
+        datum = DatumFactory.createInt2(Bytes.toShort(bytes, offset, length));
+        break;
+      case INT4:
+        datum = DatumFactory.createInt4((int) Bytes.readVLong(bytes, offset));
+        break;
+      case INT8:
+        datum = DatumFactory.createInt8(Bytes.readVLong(bytes, offset));
+        break;
+      case FLOAT4:
+        datum = DatumFactory.createFloat4(toFloat(bytes, offset, length));
+        break;
+      case FLOAT8:
+        datum = DatumFactory.createFloat8(toDouble(bytes, offset, length));
+        break;
+      case TEXT: {
+        byte[] chars = new byte[length];
+        System.arraycopy(bytes, offset, chars, 0, length);
+
+        if (Bytes.equals(INVALID_UTF__SINGLE_BYTE, chars)) {
+          datum = DatumFactory.createText(new byte[0]);
+        } else {
+          datum = DatumFactory.createText(chars);
+        }
+        break;
+      }
+      case PROTOBUF: {
+        ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
+        Message.Builder builder = factory.newBuilder();
+        builder.mergeFrom(bytes, offset, length);
+        datum = factory.createDatum(builder);
+        break;
+      }
+      case INET4:
+        datum = DatumFactory.createInet4(bytes, offset, length);
+        break;
+      case BLOB:
+        datum = DatumFactory.createBlob(bytes, offset, length);
+        break;
+      default:
+        datum = NullDatum.get();
+    }
+    return datum;
+  }
+
+  private byte[] shortBytes = new byte[2];
+
+  public int writeShort(OutputStream out, short val) throws IOException {
+    shortBytes[0] = (byte) (val >> 8);
+    shortBytes[1] = (byte) val;
+    out.write(shortBytes, 0, 2);
+    return 2;
+  }
+
+  public float toFloat(byte[] bytes, int offset, int length) {
+    Preconditions.checkArgument(length == 4);
+
+    int val = ((bytes[offset] & 0x000000FF) << 24) +
+        ((bytes[offset + 1] & 0x000000FF) << 16) +
+        ((bytes[offset + 2] & 0x000000FF) << 8) +
+        (bytes[offset + 3] & 0x000000FF);
+    return Float.intBitsToFloat(val);
+  }
+
+  private byte[] floatBytes = new byte[4];
+
+  public int writeFloat(OutputStream out, float f) throws IOException {
+    int val = Float.floatToIntBits(f);
+
+    floatBytes[0] = (byte) (val >> 24);
+    floatBytes[1] = (byte) (val >> 16);
+    floatBytes[2] = (byte) (val >> 8);
+    floatBytes[3] = (byte) val;
+    out.write(floatBytes, 0, 4);
+    return floatBytes.length;
+  }
+
+  public double toDouble(byte[] bytes, int offset, int length) {
+    Preconditions.checkArgument(length == 8);
+    long val = ((long) (bytes[offset] & 0x00000000000000FF) << 56) +
+        ((long) (bytes[offset + 1] & 0x00000000000000FF) << 48) +
+        ((long) (bytes[offset + 2] & 0x00000000000000FF) << 40) +
+        ((long) (bytes[offset + 3] & 0x00000000000000FF) << 32) +
+        ((long) (bytes[offset + 4] & 0x00000000000000FF) << 24) +
+        ((long) (bytes[offset + 5] & 0x00000000000000FF) << 16) +
+        ((long) (bytes[offset + 6] & 0x00000000000000FF) << 8) +
+        (long) (bytes[offset + 7] & 0x00000000000000FF);
+    return Double.longBitsToDouble(val);
+  }
+
+  private byte[] doubleBytes = new byte[8];
+
+  public int writeDouble(OutputStream out, double d) throws IOException {
+    long val = Double.doubleToLongBits(d);
+
+    doubleBytes[0] = (byte) (val >> 56);
+    doubleBytes[1] = (byte) (val >> 48);
+    doubleBytes[2] = (byte) (val >> 40);
+    doubleBytes[3] = (byte) (val >> 32);
+    doubleBytes[4] = (byte) (val >> 24);
+    doubleBytes[5] = (byte) (val >> 16);
+    doubleBytes[6] = (byte) (val >> 8);
+    doubleBytes[7] = (byte) val;
+    out.write(doubleBytes, 0, 8);
+    return doubleBytes.length;
+  }
+
+  private byte[] vLongBytes = new byte[9];
+
+  public int writeVLongToByteArray(byte[] bytes, int offset, long l) {
+    if (l >= -112 && l <= 127) {
+      bytes[offset] = (byte) l;
+      return 1;
+    }
+
+    int len = -112;
+    if (l < 0) {
+      l ^= -1L; // take one's complement'
+      len = -120;
+    }
+
+    long tmp = l;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
+
+    bytes[offset++] = (byte) len;
+    len = (len < -120) ? -(len + 120) : -(len + 112);
+
+    for (int idx = len; idx != 0; idx--) {
+      int shiftbits = (idx - 1) * 8;
+      bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
+    }
+    return 1 + len;
+  }
+
+  public int writeVLong(OutputStream out, long l) throws IOException {
+    int len = writeVLongToByteArray(vLongBytes, 0, l);
+    out.write(vLongBytes, 0, len);
+    return len;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 6e0dc32..5d05d6f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -49,6 +49,7 @@ public class CSVFile {
 
   public static final String DELIMITER = "csvfile.delimiter";
   public static final String NULL = "csvfile.null";     //read only
+  public static final String SERDE = "csvfile.serde";
   public static final String DELIMITER_DEFAULT = "|";
   public static final byte LF = '\n';
   public static int EOF = -1;
@@ -75,7 +76,7 @@ public class CSVFile {
     private long pos = 0;
 
     private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE);
-    private TextSerializeDeserialize serializeDeserialize = new TextSerializeDeserialize();
+    private SerializerDeserializer serde;
 
     public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException {
       super(conf, schema, meta, path);
@@ -114,7 +115,7 @@ public class CSVFile {
 
         fos = fs.create(compressedPath);
         deflateFilter = codec.createOutputStream(fos, compressor);
-        outputStream = new DataOutputStream(new BufferedOutputStream(deflateFilter));
+        outputStream = new DataOutputStream(deflateFilter);
 
       } else {
         if (fs.exists(path)) {
@@ -128,6 +129,14 @@ public class CSVFile {
         this.stats = new TableStatistics(this.schema);
       }
 
+      try {
+        String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName());
+        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        throw new IOException(e);
+      }
+
       os.reset();
       pos = fos.getPos();
       bufferedBytes = 0;
@@ -142,7 +151,7 @@ public class CSVFile {
 
       for (int i = 0; i < columnNum; i++) {
         datum = tuple.get(i);
-        rowBytes += serializeDeserialize.serialize(schema.getColumn(i), datum, os, nullChars);
+        rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
 
         if(columnNum - 1 > i){
           os.write((byte) delimiter);
@@ -251,7 +260,7 @@ public class CSVFile {
       }
     }
 
-    private final static int DEFAULT_BUFFER_SIZE = 128 * 1024;
+    private final static int DEFAULT_PAGE_SIZE = 256 * 1024;
     private char delimiter;
     private FileSystem fs;
     private FSDataInputStream fis;
@@ -261,16 +270,17 @@ public class CSVFile {
     private Decompressor decompressor;
     private Seekable filePosition;
     private boolean splittable = false;
-    private long startOffset, length, end, pos;
-    private int currentIdx = 0, validIdx = 0;
+    private long startOffset, end, pos;
+    private int currentIdx = 0, validIdx = 0, recordCount = 0;
     private int[] targetColumnIndexes;
     private boolean eof = false;
     private final byte[] nullChars;
-    private LineReader reader;
+    private SplitLineReader reader;
     private ArrayList<Long> fileOffsets = new ArrayList<Long>();
     private ArrayList<Integer> rowLengthList = new ArrayList<Integer>();
     private ArrayList<Integer> startOffsets = new ArrayList<Integer>();
-    private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
+    private NonSyncByteArrayOutputStream buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE);
+    private SerializerDeserializer serde;
 
     @Override
     public void init() throws IOException {
@@ -281,32 +291,34 @@ public class CSVFile {
       }
       if(fis == null) fis = fs.open(fragment.getPath());
 
+      recordCount = 0;
       pos = startOffset = fragment.getStartKey();
-      length = fragment.getEndKey();
-      end = startOffset + length;
-      fis.seek(startOffset);
+      end = startOffset + fragment.getEndKey();
 
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
         if (codec instanceof SplittableCompressionCodec) {
           SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
-              fis, decompressor, startOffset, startOffset + length,
+              fis, decompressor, startOffset, end,
               SplittableCompressionCodec.READ_MODE.BYBLOCK);
 
+          reader = new CompressedSplitLineReader(cIn, conf, null);
           startOffset = cIn.getAdjustedStart();
-          length = cIn.getAdjustedEnd() - startOffset;
+          end = cIn.getAdjustedEnd();
           filePosition = cIn;
           is = cIn;
         } else {
           is = new DataInputStream(codec.createInputStream(fis, decompressor));
+          reader = new SplitLineReader(is, null);
           filePosition = fis;
         }
       } else {
+        fis.seek(startOffset);
         filePosition = fis;
         is = fis;
+        reader = new SplitLineReader(is, null);
       }
 
-      reader = new LineReader(is, DEFAULT_BUFFER_SIZE);
       if (targets == null) {
         targets = schema.toArray();
       }
@@ -316,15 +328,24 @@ public class CSVFile {
         targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
       }
 
+      try {
+        String serdeClass = this.meta.getOption(SERDE, TextSerializerDeserializer.class.getName());
+        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        throw new IOException(e);
+      }
+
       super.init();
       Arrays.sort(targetColumnIndexes);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + length +
+        LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end +
             "," + fs.getFileStatus(fragment.getPath()).getLen());
       }
 
       if (startOffset != 0) {
-        pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos));
+        startOffset += reader.readLine(new Text(), 0, maxBytesToConsume(startOffset));
+        pos = startOffset;
       }
       eof = false;
       page();
@@ -362,12 +383,11 @@ public class CSVFile {
 
       if(eof) return;
 
-      while (DEFAULT_BUFFER_SIZE > bufferedSize){
+      while (DEFAULT_PAGE_SIZE >= bufferedSize){
 
         int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE);
 
-        if(ret <= 0){
-          eof = true;
+        if(ret == 0){
           break;
         } else {
           fileOffsets.add(pos);
@@ -376,9 +396,13 @@ public class CSVFile {
           currentBufferPos += rowLengthList.get(rowLengthList.size() - 1);
           bufferedSize += ret;
           validIdx++;
+          recordCount++;
         }
 
-        if(isSplittable() && getFilePosition() > end) break;
+        if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){
+          eof = true;
+          break;
+        }
       }
     }
 
@@ -386,7 +410,7 @@ public class CSVFile {
     public Tuple next() throws IOException {
       try {
         if (currentIdx == validIdx) {
-          if (isSplittable() && fragmentable() <= 0) {
+          if (eof) {
             return null;
           } else {
             page();
@@ -405,7 +429,7 @@ public class CSVFile {
         byte[][] cells = Bytes.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx),
             rowLengthList.get(currentIdx),  delimiter, targetColumnIndexes);
         currentIdx++;
-        return new LazyTuple(schema, cells, offset, nullChars);
+        return new LazyTuple(schema, cells, offset, nullChars, serde);
       } catch (Throwable t) {
         LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t);
         LOG.error("Tuple list current index: " + currentIdx, t);
@@ -420,7 +444,6 @@ public class CSVFile {
     @Override
     public void reset() throws IOException {
       if (decompressor != null) {
-        decompressor.reset();
         CodecPool.returnDecompressor(decompressor);
         decompressor = null;
       }
@@ -431,11 +454,15 @@ public class CSVFile {
     @Override
     public void close() throws IOException {
       try {
-        IOUtils.cleanup(LOG, is, fis);
+        IOUtils.cleanup(LOG, reader, is, fis);
         fs = null;
+        is = null;
+        fis = null;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("CSVScanner processed record:" + recordCount);
+        }
       } finally {
         if (decompressor != null) {
-          decompressor.reset();
           CodecPool.returnDecompressor(decompressor);
           decompressor = null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
new file mode 100644
index 0000000..4f58e68
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.SplitCompressionInputStream;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+/**
+ * Line reader for compressed splits
+ *
+ * Reading records from a compressed split is tricky, as the
+ * LineRecordReader is using the reported compressed input stream
+ * position directly to determine when a split has ended.  In addition the
+ * compressed input stream is usually faking the actual byte position, often
+ * updating it only after the first compressed block after the split is
+ * accessed.
+ *
+ * Depending upon where the last compressed block of the split ends relative
+ * to the record delimiters it can be easy to accidentally drop the last
+ * record or duplicate the last record between this split and the next.
+ *
+ * Split end scenarios:
+ *
+ * 1) Last block of split ends in the middle of a record
+ *      Nothing special that needs to be done here, since the compressed input
+ *      stream will report a position after the split end once the record
+ *      is fully read.  The consumer of the next split will discard the
+ *      partial record at the start of the split normally, and no data is lost
+ *      or duplicated between the splits.
+ *
+ * 2) Last block of split ends in the middle of a delimiter
+ *      The line reader will continue to consume bytes into the next block to
+ *      locate the end of the delimiter.  If a custom delimiter is being used
+ *      then the next record must be read by this split or it will be dropped.
+ *      The consumer of the next split will not recognize the partial
+ *      delimiter at the beginning of its split and will discard it along with
+ *      the next record.
+ *
+ *      However for the default delimiter processing there is a special case
+ *      because CR, LF, and CRLF are all valid record delimiters.  If the
+ *      block ends with a CR then the reader must peek at the next byte to see
+ *      if it is an LF and therefore part of the same record delimiter.
+ *      Peeking at the next byte is an access to the next block and triggers
+ *      the stream to report the end of the split.  There are two cases based
+ *      on the next byte:
+ *
+ *      A) The next byte is LF
+ *           The split needs to end after the current record is returned.  The
+ *           consumer of the next split will discard the first record, which
+ *           is degenerate since LF is itself a delimiter, and start consuming
+ *           records after that byte.  If the current split tries to read
+ *           another record then the record will be duplicated between splits.
+ *
+ *      B) The next byte is not LF
+ *           The current record will be returned but the stream will report
+ *           the split has ended due to the peek into the next block.  If the
+ *           next record is not read then it will be lost, as the consumer of
+ *           the next split will discard it before processing subsequent
+ *           records.  Therefore the next record beyond the reported split end
+ *           must be consumed by this split to avoid data loss.
+ *
+ * 3) Last block of split ends at the beginning of a delimiter
+ *      This is equivalent to case 1, as the reader will consume bytes into
+ *      the next block and trigger the end of the split.  No further records
+ *      should be read as the consumer of the next split will discard the
+ *      (degenerate) record at the beginning of its split.
+ *
+ * 4) Last block of split ends at the end of a delimiter
+ *      Nothing special needs to be done here. The reader will not start
+ *      examining the bytes into the next block until the next record is read,
+ *      so the stream will not report the end of the split just yet.  Once the
+ *      next record is read then the next block will be accessed and the
+ *      stream will indicate the end of the split.  The consumer of the next
+ *      split will correctly discard the first record of its split, and no
+ *      data is lost or duplicated.
+ *
+ *      If the default delimiter is used and the block ends at a CR then this
+ *      is treated as case 2 since the reader does not yet know without
+ *      looking at subsequent bytes whether the delimiter has ended.
+ *
+ * NOTE: It is assumed that compressed input streams *never* return bytes from
+ *       multiple compressed blocks from a single read.  Failure to do so will
+ *       violate the buffering performed by this class, as it will access
+ *       bytes into the next block after the split before returning all of the
+ *       records from the previous block.
+ */
+
+public class CompressedSplitLineReader extends SplitLineReader {
+  SplitCompressionInputStream scin;
+  private boolean usingCRLF;
+  private boolean needAdditionalRecord = false;
+  private boolean finished = false;
+
+  public CompressedSplitLineReader(SplitCompressionInputStream in,
+                                   Configuration conf,
+                                   byte[] recordDelimiterBytes)
+      throws IOException {
+    super(in, conf, recordDelimiterBytes);
+    scin = in;
+    usingCRLF = (recordDelimiterBytes == null);
+  }
+
+  @Override
+  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+      throws IOException {
+    int bytesRead = in.read(buffer);
+
+    // If the split ended in the middle of a record delimiter then we need
+    // to read one additional record, as the consumer of the next split will
+    // not recognize the partial delimiter as a record.
+    // However if using the default delimiter and the next character is a
+    // linefeed then next split will treat it as a delimiter all by itself
+    // and the additional record read should not be performed.
+    if (inDelimiter && bytesRead > 0) {
+      if (usingCRLF) {
+        needAdditionalRecord = (buffer[0] != '\n');
+      } else {
+        needAdditionalRecord = true;
+      }
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    int bytesRead = 0;
+    if (!finished) {
+      // only allow at most one more record to be read after the stream
+      // reports the split ended
+      if (scin.getPos() > scin.getAdjustedEnd()) {
+        finished = true;
+      }
+
+      bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume);
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
+      , int maxBytesToConsume) throws IOException {
+    int bytesRead = 0;
+    if (!finished) {
+      // only allow at most one more record to be read after the stream
+      // reports the split ended
+      if (scin.getPos() > scin.getAdjustedEnd()) {
+        finished = true;
+      }
+
+      bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume);
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public boolean needAdditionalRecordAfterSplit() {
+    return !finished && needAdditionalRecord;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index 9765d13..9967d7a 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -31,18 +31,19 @@ public class LazyTuple implements Tuple {
   private byte[][] textBytes;
   private Schema schema;
   private byte[] nullBytes;
-  private static TextSerializeDeserialize serializeDeserialize = new TextSerializeDeserialize();
+  private SerializerDeserializer serializeDeserialize;
 
   public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
-    this(schema, textBytes, offset, NullDatum.get().asTextBytes());
+    this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
   }
 
-  public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes) {
+  public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
     this.schema = schema;
     this.textBytes = textBytes;
     this.values = new Datum[schema.getColumnNum()];
     this.offset = offset;
     this.nullBytes = nullBytes;
+    this.serializeDeserialize = serde;
   }
 
   public LazyTuple(LazyTuple tuple) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
index f48c482..66c610a 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/LineReader.java
@@ -185,6 +185,10 @@ public class LineReader implements Closeable {
     }
   }
 
+  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+      throws IOException {
+    return in.read(buffer);
+  }
   /**
    * Read a line terminated by one of CR, LF, or CRLF.
    */
@@ -218,7 +222,7 @@ public class LineReader implements Closeable {
         if (prevCharCR) {
           ++bytesConsumed; //account for CR from previous read
         }
-        bufferLength = in.read(buffer);
+        bufferLength = fillBuffer(in, buffer, prevCharCR);
         if (bufferLength <= 0) {
           break; // EOF
         }
@@ -290,7 +294,7 @@ public class LineReader implements Closeable {
         if (prevCharCR) {
           ++bytesConsumed; //account for CR from previous read
         }
-        bufferLength = in.read(buffer);
+        bufferLength = fillBuffer(in, buffer, prevCharCR);
         if (bufferLength <= 0) {
           break; // EOF
         }
@@ -482,7 +486,7 @@ public class LineReader implements Closeable {
       int startPosn = bufferPosn; // Start from previous end position
       if (bufferPosn >= bufferLength) {
         startPosn = bufferPosn = 0;
-        bufferLength = in.read(buffer);
+        bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
         if (bufferLength <= 0) {
           str.append(recordDelimiterBytes, 0, ambiguousByteCount);
           break; // EOF

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializeDeserialize.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializeDeserialize.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializeDeserialize.java
deleted file mode 100644
index 53f3ad8..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializeDeserialize.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-
-public interface SerializeDeserialize {
-
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
-
-  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
new file mode 100644
index 0000000..333f205
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+
+public interface SerializerDeserializer {
+
+  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
+
+  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
new file mode 100644
index 0000000..3579674
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SplitLineReader extends LineReader {
+  public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) {
+    super(in, recordDelimiterBytes);
+  }
+
+  public SplitLineReader(InputStream in, Configuration conf,
+                         byte[] recordDelimiterBytes) throws IOException {
+    super(in, conf, recordDelimiterBytes);
+  }
+
+  public boolean needAdditionalRecordAfterSplit() {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
deleted file mode 100644
index 6afd9b9..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializeDeserialize.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.protobuf.Message;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-//Compatibility with Apache Hive
-public class TextSerializeDeserialize implements SerializeDeserialize {
-  public static byte[] trueBytes = "true".getBytes();
-  public static byte[] falseBytes = "false".getBytes();
-  private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
-
-
-  @Override
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
-
-    byte[] bytes;
-    int length = 0;
-    TajoDataTypes.DataType dataType = col.getDataType();
-
-    if (datum == null || datum instanceof NullDatum) {
-      switch (dataType.getType()) {
-        case CHAR:
-        case TEXT:
-          length = nullCharacters.length;
-          out.write(nullCharacters);
-          break;
-        default:
-          break;
-      }
-      return length;
-    }
-
-    switch (dataType.getType()) {
-      case BOOLEAN:
-        out.write(datum.asBool() ? trueBytes : falseBytes);
-        length = trueBytes.length;
-        break;
-      case CHAR:
-        byte[] pad = new byte[dataType.getLength() - datum.size()];
-        bytes = datum.asTextBytes();
-        out.write(bytes);
-        out.write(pad);
-        length = bytes.length + pad.length;
-        break;
-      case TEXT:
-      case BIT:
-      case INT2:
-      case INT4:
-      case INT8:
-      case FLOAT4:
-      case FLOAT8:
-      case INET4:
-      case DATE:
-      case TIMESTAMP:
-        bytes = datum.asTextBytes();
-        length = bytes.length;
-        out.write(bytes);
-        break;
-      case INET6:
-      case BLOB:
-        bytes = Base64.encodeBase64(datum.asByteArray(), false);
-        length = bytes.length;
-        out.write(bytes, 0, length);
-        break;
-      case PROTOBUF:
-        ProtobufDatum protobuf = (ProtobufDatum) datum;
-        byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
-        length = protoBytes.length;
-        out.write(protoBytes, 0, protoBytes.length);
-        break;
-      case NULL_TYPE:
-      default:
-        break;
-    }
-    return length;
-  }
-
-  @Override
-  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
-
-    Datum datum;
-    switch (col.getDataType().getType()) {
-      case BOOLEAN:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T');
-        break;
-      case BIT:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length)));
-        break;
-      case CHAR:
-        datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createChar(new String(bytes, offset, length).trim());
-        break;
-      case INT2:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createInt2(new String(bytes, offset, length));
-        break;
-      case INT4:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createInt4(new String(bytes, offset, length));
-        break;
-      case INT8:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createInt8(new String(bytes, offset, length));
-        break;
-      case FLOAT4:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createFloat4(new String(bytes, offset, length));
-        break;
-      case FLOAT8:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createFloat8(new String(bytes, offset, length));
-        break;
-      case TEXT: {
-        byte[] chars = new byte[length];
-        System.arraycopy(bytes, offset, chars, 0, length);
-        datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createText(chars);
-        break;
-      }
-      case DATE:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createDate(new String(bytes, offset, length));
-        break;
-      case TIMESTAMP:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createTimeStamp(new String(bytes, offset, length));
-        break;
-      case PROTOBUF: {
-        if (isNull(bytes, offset, length, nullCharacters)) {
-          datum = NullDatum.get();
-        } else {
-          ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
-          Message.Builder builder = factory.newBuilder();
-          try {
-            byte[] protoBytes = new byte[length];
-            System.arraycopy(bytes, offset, protoBytes, 0, length);
-            protobufJsonFormat.merge(protoBytes, builder);
-            datum = factory.createDatum(builder.build());
-          } catch (IOException e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-          }
-        }
-        break;
-      }
-      case INET4:
-        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
-            : DatumFactory.createInet4(new String(bytes, offset, length));
-        break;
-      case BLOB: {
-        if (isNull(bytes, offset, length, nullCharacters)) {
-          datum = NullDatum.get();
-        } else {
-          byte[] blob = new byte[length];
-          System.arraycopy(bytes, offset, blob, 0, length);
-          datum = DatumFactory.createBlob(Base64.decodeBase64(blob));
-        }
-        break;
-      }
-      default:
-        datum = NullDatum.get();
-        break;
-    }
-    return datum;
-  }
-
-  private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) {
-    return length == 0 || ((length == nullBytes.length)
-        && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length));
-  }
-
-  private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) {
-    return length > 0 && length == nullBytes.length
-        && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
new file mode 100644
index 0000000..b80b461
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.protobuf.Message;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+//Compatibility with Apache Hive
+public class TextSerializerDeserializer implements SerializerDeserializer {
+  public static byte[] trueBytes = "true".getBytes();
+  public static byte[] falseBytes = "false".getBytes();
+  private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+
+
+  @Override
+  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
+
+    byte[] bytes;
+    int length = 0;
+    TajoDataTypes.DataType dataType = col.getDataType();
+
+    if (datum == null || datum instanceof NullDatum) {
+      switch (dataType.getType()) {
+        case CHAR:
+        case TEXT:
+          length = nullCharacters.length;
+          out.write(nullCharacters);
+          break;
+        default:
+          break;
+      }
+      return length;
+    }
+
+    switch (dataType.getType()) {
+      case BOOLEAN:
+        out.write(datum.asBool() ? trueBytes : falseBytes);
+        length = trueBytes.length;
+        break;
+      case CHAR:
+        byte[] pad = new byte[dataType.getLength() - datum.size()];
+        bytes = datum.asTextBytes();
+        out.write(bytes);
+        out.write(pad);
+        length = bytes.length + pad.length;
+        break;
+      case TEXT:
+      case BIT:
+      case INT2:
+      case INT4:
+      case INT8:
+      case FLOAT4:
+      case FLOAT8:
+      case INET4:
+      case DATE:
+      case TIMESTAMP:
+        bytes = datum.asTextBytes();
+        length = bytes.length;
+        out.write(bytes);
+        break;
+      case INET6:
+      case BLOB:
+        bytes = Base64.encodeBase64(datum.asByteArray(), false);
+        length = bytes.length;
+        out.write(bytes, 0, length);
+        break;
+      case PROTOBUF:
+        ProtobufDatum protobuf = (ProtobufDatum) datum;
+        byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
+        length = protoBytes.length;
+        out.write(protoBytes, 0, protoBytes.length);
+        break;
+      case NULL_TYPE:
+      default:
+        break;
+    }
+    return length;
+  }
+
+  @Override
+  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+
+    Datum datum;
+    switch (col.getDataType().getType()) {
+      case BOOLEAN:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T');
+        break;
+      case BIT:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length)));
+        break;
+      case CHAR:
+        datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createChar(new String(bytes, offset, length).trim());
+        break;
+      case INT2:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInt2(new String(bytes, offset, length));
+        break;
+      case INT4:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInt4(new String(bytes, offset, length));
+        break;
+      case INT8:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInt8(new String(bytes, offset, length));
+        break;
+      case FLOAT4:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createFloat4(new String(bytes, offset, length));
+        break;
+      case FLOAT8:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createFloat8(new String(bytes, offset, length));
+        break;
+      case TEXT: {
+        byte[] chars = new byte[length];
+        System.arraycopy(bytes, offset, chars, 0, length);
+        datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createText(chars);
+        break;
+      }
+      case DATE:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createDate(new String(bytes, offset, length));
+        break;
+      case TIMESTAMP:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createTimeStamp(new String(bytes, offset, length));
+        break;
+      case PROTOBUF: {
+        if (isNull(bytes, offset, length, nullCharacters)) {
+          datum = NullDatum.get();
+        } else {
+          ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType());
+          Message.Builder builder = factory.newBuilder();
+          try {
+            byte[] protoBytes = new byte[length];
+            System.arraycopy(bytes, offset, protoBytes, 0, length);
+            protobufJsonFormat.merge(protoBytes, builder);
+            datum = factory.createDatum(builder.build());
+          } catch (IOException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e);
+          }
+        }
+        break;
+      }
+      case INET4:
+        datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
+            : DatumFactory.createInet4(new String(bytes, offset, length));
+        break;
+      case BLOB: {
+        if (isNull(bytes, offset, length, nullCharacters)) {
+          datum = NullDatum.get();
+        } else {
+          byte[] blob = new byte[length];
+          System.arraycopy(bytes, offset, blob, 0, length);
+          datum = DatumFactory.createBlob(Base64.decodeBase64(blob));
+        }
+        break;
+      }
+      default:
+        datum = NullDatum.get();
+        break;
+    }
+    return datum;
+  }
+
+  private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) {
+    return length == 0 || ((length == nullBytes.length)
+        && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length));
+  }
+
+  private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) {
+    return length > 0 && length == nullBytes.length
+        && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 74e2192..9fe0bce 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -590,7 +590,7 @@ public class RCFile {
     private ColumnBuffer[] columnBuffers = null;
     boolean useNewMagic = true;
     private byte[] nullChars;
-    SerializeDeserialize serde;
+    SerializerDeserializer serde;
 
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
@@ -742,9 +742,9 @@ public class RCFile {
 
       metadata.set(new Text(COLUMN_NUMBER_METADATA_STR), new Text("" + columnNumber));
 
-      String serdeClass = this.meta.getOption(SERDE, BinarySerializeDeserialize.class.getName());
+      String serdeClass = this.meta.getOption(SERDE, BinarySerializerDeserializer.class.getName());
       try {
-        serde = (SerializeDeserialize) Class.forName(serdeClass).newInstance();
+        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         throw new IOException(e);
@@ -1152,7 +1152,7 @@ public class RCFile {
 
     private LongWritable rowId;
     private byte[] nullChars;
-    private SerializeDeserialize serde;
+    private SerializerDeserializer serde;
 
     public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
                          final FileFragment fragment) throws IOException {
@@ -1347,9 +1347,9 @@ public class RCFile {
         if(text != null && !text.toString().isEmpty()){
           serdeClass = text.toString();
         } else{
-          serdeClass = this.meta.getOption(SERDE, BinarySerializeDeserialize.class.getName());
+          serdeClass = this.meta.getOption(SERDE, BinarySerializerDeserializer.class.getName());
         }
-        serde = (SerializeDeserialize) Class.forName(serdeClass).newInstance();
+        serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
index 0b63d20..b7a5929 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
@@ -30,13 +30,10 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.BinarySerializeDeserialize;
 import org.apache.tajo.storage.rcfile.BytesRefArrayWritable;
 import org.apache.tajo.storage.rcfile.ColumnProjectionUtils;
-import org.apache.tajo.storage.SerializeDeserialize;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
@@ -57,7 +54,7 @@ public class RCFileScanner extends FileScannerV2 {
   private ScheduledInputStream sin;
   private boolean first = true;
   private int maxBytesPerSchedule;
-  SerializeDeserialize serde;
+  SerializerDeserializer serde;
   byte[] nullChars;
 
   public RCFileScanner(final Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
@@ -206,9 +203,9 @@ public class RCFileScanner extends FileScannerV2 {
           if(text != null && !text.toString().isEmpty()){
             serdeClass = text.toString();
           } else{
-            serdeClass = this.meta.getOption(SERDE, BinarySerializeDeserialize.class.getName());
+            serdeClass = this.meta.getOption(SERDE, BinarySerializerDeserializer.class.getName());
           }
-          serde = (SerializeDeserialize) Class.forName(serdeClass).newInstance();
+          serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
         } catch (Exception e) {
           LOG.error(e.getMessage(), e);
           throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 08a181a..bec1556 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -178,7 +178,7 @@ public class TestCompressionStorages {
 
     TableMeta meta = CatalogUtil.newTableMeta(storeType);
     meta.putOption("compression.codec", codec.getCanonicalName());
-    meta.putOption("rcfile.serde", "org.apache.tajo.storage.TextSerializeDeserialize");
+    meta.putOption("rcfile.serde", TextSerializerDeserializer.class.getName());
 
     String fileName = "Compression_" + codec.getSimpleName();
     Path tablePath = new Path(testDir, fileName);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
index 811a191..fecbe89 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -34,6 +34,7 @@ public class TestLazyTuple {
   Schema schema;
   byte[][] textRow;
   byte[] nullbytes;
+  SerializerDeserializer serde;
 
   @Before
   public void setUp() {
@@ -69,12 +70,13 @@ public class TestLazyTuple {
     sb.append(new String(nullbytes)).append('|');
     sb.append(NullDatum.get());
     textRow = Bytes.splitPreserveAllTokens(sb.toString().getBytes(), '|');
+    serde = new TextSerializerDeserializer();
   }
 
   @Test
   public void testGetDatum() {
 
-    LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes);
+    LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
     assertEquals(DatumFactory.createBool(true), t1.get(0));
     assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
     assertEquals(DatumFactory.createChar("str"), t1.get(2));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/2c53ccc1/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 2faae7f..16b370c 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -268,7 +268,7 @@ public class TestStorages {
 
     Options options = new Options();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
-    meta.putOption(RCFile.SERDE, TextSerializeDeserialize.class.getName());
+    meta.putOption(RCFile.SERDE, TextSerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
@@ -331,7 +331,7 @@ public class TestStorages {
 
     Options options = new Options();
     TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
-    meta.putOption(RCFile.SERDE, BinarySerializeDeserialize.class.getName());
+    meta.putOption(RCFile.SERDE, BinarySerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);