You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/16 00:42:24 UTC

[02/13] tajo git commit: TAJO-1450: Encapsulate Datum in Tuple.

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
index cba9ee0..b25b2fd 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
@@ -110,8 +110,8 @@ public class BaseTupleComparator extends TupleComparator implements ProtoObject<
   @Override
   public int compare(Tuple tuple1, Tuple tuple2) {
     for (int i = 0; i < sortKeyIds.length; i++) {
-      left = tuple1.get(sortKeyIds[i]);
-      right = tuple2.get(sortKeyIds[i]);
+      left = tuple1.asDatum(sortKeyIds[i]);
+      right = tuple2.asDatum(sortKeyIds[i]);
 
       if (left.isNull() || right.isNull()) {
         if (!left.equals(right)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
index a3b8da8..2cccb69 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.Message;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
 import org.apache.tajo.util.Bytes;
@@ -33,50 +34,58 @@ public class BinarySerializerDeserializer implements SerializerDeserializer {
 
   static final byte[] INVALID_UTF__SINGLE_BYTE = {(byte) Integer.parseInt("10111111", 2)};
 
+  private Schema schema;
+
   @Override
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters)
+  public void init(Schema schema) {
+    this.schema = schema;
+  }
+
+  @Override
+  public int serialize(int index, Tuple tuple, OutputStream out, byte[] nullCharacters)
       throws IOException {
-    byte[] bytes;
-    int length = 0;
-    if (datum == null || datum instanceof NullDatum) {
+    if (tuple.isBlankOrNull(index)) {
       return 0;
     }
 
-    switch (col.getDataType().getType()) {
+    byte[] bytes;
+    int length = 0;
+    Column column = schema.getColumn(index);
+    switch (column.getDataType().getType()) {
       case BOOLEAN:
       case BIT:
-        bytes = datum.asByteArray();
+        bytes = tuple.getBytes(index);
         length = bytes.length;
         out.write(bytes, 0, length);
 				break;
 
       case CHAR:
-        bytes = datum.asByteArray();
+        bytes = tuple.getBytes(index);
         length = bytes.length;
-        if (length > col.getDataType().getLength()) {
-          throw new ValueTooLongForTypeCharactersException(col.getDataType().getLength());
+        if (length > column.getDataType().getLength()) {
+          throw new ValueTooLongForTypeCharactersException(column.getDataType().getLength());
         }
 
         out.write(bytes, 0, length);
         break;
       case INT2:
-        length = writeShort(out, datum.asInt2());
+        length = writeShort(out, tuple.getInt2(index));
         break;
       case INT4:
-        length = writeVLong(out, datum.asInt4());
+        length = writeVLong(out, tuple.getInt4(index));
         break;
       case INT8:
-        length = writeVLong(out, datum.asInt8());
+        length = writeVLong(out, tuple.getInt8(index));
         break;
       case FLOAT4:
-        length = writeFloat(out, datum.asFloat4());
+        length = writeFloat(out, tuple.getFloat4(index));
         break;
       case FLOAT8:
-        length = writeDouble(out, datum.asFloat8());
+        length = writeDouble(out, tuple.getFloat8(index));
         break;
       case TEXT: {
-        bytes = datum.asTextBytes();
-        length = datum.size();
+        bytes = tuple.getTextBytes(index);
+        length = bytes.length;
         if (length == 0) {
           bytes = INVALID_UTF__SINGLE_BYTE;
           length = INVALID_UTF__SINGLE_BYTE.length;
@@ -87,12 +96,12 @@ public class BinarySerializerDeserializer implements SerializerDeserializer {
       case BLOB:
       case INET4:
       case INET6:
-        bytes = datum.asByteArray();
+        bytes = tuple.getBytes(index);
         length = bytes.length;
         out.write(bytes, 0, length);
         break;
       case PROTOBUF:
-        ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+        ProtobufDatum protobufDatum = (ProtobufDatum) tuple.getProtobufDatum(index);
         bytes = protobufDatum.asByteArray();
         length = bytes.length;
         out.write(bytes, 0, length);
@@ -106,11 +115,12 @@ public class BinarySerializerDeserializer implements SerializerDeserializer {
   }
 
   @Override
-  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+  public Datum deserialize(int index, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
     if (length == 0) return NullDatum.get();
 
     Datum datum;
-    switch (col.getDataType().getType()) {
+    Column column = schema.getColumn(index);
+    switch (column.getDataType().getType()) {
       case BOOLEAN:
         datum = DatumFactory.createBool(bytes[offset]);
         break;
@@ -150,7 +160,7 @@ public class BinarySerializerDeserializer implements SerializerDeserializer {
         break;
       }
       case PROTOBUF: {
-        ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType().getCode());
+        ProtobufDatumFactory factory = ProtobufDatumFactory.get(column.getDataType().getCode());
         Message.Builder builder = factory.newBuilder();
         builder.mergeFrom(bytes, offset, length);
         datum = factory.createDatum(builder);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
index a5561ed..ed53832 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -22,10 +22,12 @@
 package org.apache.tajo.storage;
 
 import com.google.common.base.Preconditions;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.util.datetime.TimeMeta;
 
 /**
  * An instance of FrameTuple is an immutable tuple.
@@ -82,13 +84,19 @@ public class FrameTuple implements Tuple, Cloneable {
   }
 
   @Override
-  public boolean isNull(int fieldid) {
-    return get(fieldid).isNull();
+  public boolean isBlank(int fieldid) {
+    return asDatum(fieldid) == null;
   }
 
   @Override
-  public boolean isNotNull(int fieldid) {
-    return !isNull(fieldid);
+  public boolean isBlankOrNull(int fieldid) {
+    Datum datum = asDatum(fieldid);
+    return datum == null || datum.isNull();
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedException();
   }
 
   @Override
@@ -102,13 +110,18 @@ public class FrameTuple implements Tuple, Cloneable {
   }
 
   @Override
-  public void put(int fieldId, Datum[] values) {
+  public void put(Datum[] values) {
     throw new UnsupportedException();
   }
 
   @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new UnsupportedException();
+  public TajoDataTypes.Type type(int fieldId) {
+    return null;
+  }
+
+  @Override
+  public int size(int fieldId) {
+    return 0;
   }
 
   @Override
@@ -122,85 +135,90 @@ public class FrameTuple implements Tuple, Cloneable {
   }
 
   @Override
-  public void put(Datum [] values) {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public Datum get(int fieldId) {
+  public Datum asDatum(int fieldId) {
     Preconditions.checkArgument(fieldId < size, 
         "Out of field access: " + fieldId);
     
     if (fieldId < leftSize) {
-      return left.get(fieldId);
+      return left.asDatum(fieldId);
     } else {
-      return right.get(fieldId - leftSize);
+      return right.asDatum(fieldId - leftSize);
     }
   }
 
   @Override
   public boolean getBool(int fieldId) {
-    return get(fieldId).asBool();
+    return asDatum(fieldId).asBool();
   }
 
   @Override
   public byte getByte(int fieldId) {
-    return get(fieldId).asByte();
+    return asDatum(fieldId).asByte();
   }
 
   @Override
   public char getChar(int fieldId) {
-    return get(fieldId).asChar();
+    return asDatum(fieldId).asChar();
   }
 
   @Override
   public byte [] getBytes(int fieldId) {
-    return get(fieldId).asByteArray();
+    return asDatum(fieldId).asByteArray();
+  }
+
+  @Override
+  public byte[] getTextBytes(int fieldId) {
+    return asDatum(fieldId).asTextBytes();
   }
 
   @Override
   public short getInt2(int fieldId) {
-    return get(fieldId).asInt2();
+    return asDatum(fieldId).asInt2();
   }
 
   @Override
   public int getInt4(int fieldId) {
-    return get(fieldId).asInt4();
+    return asDatum(fieldId).asInt4();
   }
 
   @Override
   public long getInt8(int fieldId) {
-    return get(fieldId).asInt8();
+    return asDatum(fieldId).asInt8();
   }
 
   @Override
   public float getFloat4(int fieldId) {
-    return get(fieldId).asFloat4();
+    return asDatum(fieldId).asFloat4();
   }
 
   @Override
   public double getFloat8(int fieldId) {
-    return get(fieldId).asFloat8();
+    return asDatum(fieldId).asFloat8();
   }
 
   @Override
   public String getText(int fieldId) {
-    return get(fieldId).asChars();
+    return asDatum(fieldId).asChars();
+  }
+
+  @Override
+  public TimeMeta getTimeDate(int fieldId) {
+    return null;
   }
 
   @Override
   public ProtobufDatum getProtobufDatum(int fieldId) {
-    return (ProtobufDatum) get(fieldId);
+    return (ProtobufDatum) asDatum(fieldId);
   }
 
   @Override
   public IntervalDatum getInterval(int fieldId) {
-    return (IntervalDatum) get(fieldId);
+    return (IntervalDatum) asDatum(fieldId);
   }
 
   @Override
   public char [] getUnicodeChars(int fieldId) {
-    return get(fieldId).asUnicodeChars();
+    return asDatum(fieldId).asUnicodeChars();
   }
 
   @Override
@@ -228,7 +246,7 @@ public class FrameTuple implements Tuple, Cloneable {
         }
         str.append(i)
         .append("=>")
-        .append(get(i));
+        .append(asDatum(i));
       }
     }
     str.append(")");

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
index bfbe478..7bfc166 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -19,11 +19,13 @@
 package org.apache.tajo.storage;
 
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.util.datetime.TimeMeta;
 
 import java.util.Arrays;
 
@@ -36,7 +38,7 @@ public class LazyTuple implements Tuple, Cloneable {
   private SerializerDeserializer serializeDeserialize;
 
   public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
-    this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
+    this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer(schema));
   }
 
   public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
@@ -68,13 +70,19 @@ public class LazyTuple implements Tuple, Cloneable {
   }
 
   @Override
-  public boolean isNull(int fieldid) {
-    return get(fieldid).isNull();
+  public boolean isBlank(int fieldid) {
+    return get(fieldid) == null;
   }
 
   @Override
-  public boolean isNotNull(int fieldid) {
-    return !isNull(fieldid);
+  public boolean isBlankOrNull(int fieldid) {
+    Datum datum = get(fieldid);
+    return datum == null || datum.isNull();
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    this.put(fieldId, tuple.asDatum(fieldId));
   }
 
   @Override
@@ -95,31 +103,28 @@ public class LazyTuple implements Tuple, Cloneable {
   }
 
   @Override
-  public void put(int fieldId, Datum[] values) {
-    for (int i = fieldId, j = 0; j < values.length; i++, j++) {
-      this.values[i] = values[j];
-    }
-    this.textBytes = new byte[values.length][];
+  public void put(Datum[] values) {
+    System.arraycopy(values, 0, this.values, 0, this.values.length);
   }
 
   @Override
-  public void put(int fieldId, Tuple tuple) {
-    for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
-      values[i] = tuple.get(j);
-      textBytes[i] = null;
-    }
+  public Datum asDatum(int fieldId) {
+     return get(fieldId);
   }
 
   @Override
-  public void put(Datum[] values) {
-    System.arraycopy(values, 0, this.values, 0, size());
-    this.textBytes = new byte[values.length][];
+  public TajoDataTypes.Type type(int fieldId) {
+    return get(fieldId).type();
+  }
+
+  @Override
+  public int size(int fieldId) {
+    return get(fieldId).size();
   }
 
   //////////////////////////////////////////////////////
   // Getter
   //////////////////////////////////////////////////////
-  @Override
   public Datum get(int fieldId) {
     if (values[fieldId] != null)
       return values[fieldId];
@@ -127,7 +132,7 @@ public class LazyTuple implements Tuple, Cloneable {
       values[fieldId] = NullDatum.get();  // split error. (col : 3, separator: ',', row text: "a,")
     } else if (textBytes[fieldId] != null) {
       try {
-        values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
+        values[fieldId] = serializeDeserialize.deserialize(fieldId,
             textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
       } catch (Exception e) {
         values[fieldId] = NullDatum.get();
@@ -170,6 +175,11 @@ public class LazyTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public byte[] getTextBytes(int fieldId) {
+    return get(fieldId).asTextBytes();
+  }
+
+  @Override
   public short getInt2(int fieldId) {
     return get(fieldId).asInt2();
   }
@@ -200,6 +210,11 @@ public class LazyTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public TimeMeta getTimeDate(int fieldId) {
+    return get(fieldId).asTimeMeta();
+  }
+
+  @Override
   public ProtobufDatum getProtobufDatum(int fieldId) {
     throw new UnsupportedException();
   }
@@ -214,7 +229,9 @@ public class LazyTuple implements Tuple, Cloneable {
     return get(fieldId).asUnicodeChars();
   }
 
+  @Override
   public String toString() {
+    // todo this changes internal state, which causes funny result in GUI debugging
     boolean first = true;
     StringBuilder str = new StringBuilder();
     str.append("(");

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 256bc78..6643d45 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -47,7 +47,7 @@ public class RowStoreUtil {
   public static Tuple project(Tuple in, Tuple out, int[] targetIds) {
     out.clear();
     for (int idx = 0; idx < targetIds.length; idx++) {
-      out.put(idx, in.get(targetIds[idx]));
+      out.put(idx, in.asDatum(targetIds[idx]));
     }
     return out;
   }
@@ -76,7 +76,7 @@ public class RowStoreUtil {
     public Tuple toTuple(byte [] bytes) {
       nullFlags.clear();
       ByteBuffer bb = ByteBuffer.wrap(bytes);
-      Tuple tuple = new VTuple(schema.size());
+      VTuple tuple = new VTuple(schema.size());
       Column col;
       TajoDataTypes.DataType type;
 
@@ -189,7 +189,7 @@ public class RowStoreUtil {
       bb.position(headerSize);
       Column col;
       for (int i = 0; i < schema.size(); i++) {
-        if (tuple.isNull(i)) {
+        if (tuple.isBlankOrNull(i)) {
           nullFlags.set(i);
           continue;
         }
@@ -200,15 +200,15 @@ public class RowStoreUtil {
           nullFlags.set(i);
           break;
         case BOOLEAN:
-          bb.put(tuple.get(i).asByte());
+          bb.put(tuple.getByte(i));
           break;
         case BIT:
-          bb.put(tuple.get(i).asByte());
+          bb.put(tuple.getByte(i));
           break;
         case CHAR:
           int charSize = col.getDataType().getLength();
           byte [] _char = new byte[charSize];
-          byte [] src = tuple.get(i).asByteArray();
+          byte [] src = tuple.getBytes(i);
           if (charSize < src.length) {
             throw new ValueTooLongForTypeCharactersException(charSize);
           }
@@ -217,48 +217,48 @@ public class RowStoreUtil {
           bb.put(_char);
           break;
         case INT2:
-          bb.putShort(tuple.get(i).asInt2());
+          bb.putShort(tuple.getInt2(i));
           break;
         case INT4:
-          bb.putInt(tuple.get(i).asInt4());
+          bb.putInt(tuple.getInt4(i));
           break;
         case INT8:
-          bb.putLong(tuple.get(i).asInt8());
+          bb.putLong(tuple.getInt8(i));
           break;
         case FLOAT4:
-          bb.putFloat(tuple.get(i).asFloat4());
+          bb.putFloat(tuple.getFloat4(i));
           break;
         case FLOAT8:
-          bb.putDouble(tuple.get(i).asFloat8());
+          bb.putDouble(tuple.getFloat8(i));
           break;
         case TEXT:
-          byte[] _string = tuple.get(i).asByteArray();
+          byte[] _string = tuple.getBytes(i);
           bb.putInt(_string.length);
           bb.put(_string);
           break;
         case DATE:
-          bb.putInt(tuple.get(i).asInt4());
+          bb.putInt(tuple.getInt4(i));
           break;
         case TIME:
         case TIMESTAMP:
-          bb.putLong(tuple.get(i).asInt8());
+          bb.putLong(tuple.getInt8(i));
           break;
         case INTERVAL:
-          IntervalDatum interval = (IntervalDatum) tuple.get(i);
+          IntervalDatum interval = (IntervalDatum) tuple.getInterval(i);
           bb.putInt(interval.getMonths());
           bb.putLong(interval.getMilliSeconds());
           break;
         case BLOB:
-          byte[] bytes = tuple.get(i).asByteArray();
+          byte[] bytes = tuple.getBytes(i);
           bb.putInt(bytes.length);
           bb.put(bytes);
           break;
         case INET4:
-          byte[] ipBytes = tuple.get(i).asByteArray();
+          byte[] ipBytes = tuple.getBytes(i);
           bb.put(ipBytes);
           break;
         case INET6:
-          bb.put(tuple.get(i).asByteArray());
+          bb.put(tuple.getBytes(i));
           break;
         default:
           throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
@@ -283,7 +283,7 @@ public class RowStoreUtil {
       Column col;
 
       for (int i = 0; i < schema.size(); i++) {
-        if (tuple.isNull(i)) {
+        if (tuple.isBlankOrNull(i)) {
           continue;
         }
 
@@ -315,11 +315,11 @@ public class RowStoreUtil {
           break;
         case TEXT:
         case BLOB:
-          size += (4 + tuple.get(i).asByteArray().length);
+          size += (4 + tuple.getBytes(i).length);
           break;
         case INET4:
         case INET6:
-          size += tuple.get(i).asByteArray().length;
+          size += tuple.getBytes(i).length;
           break;
         default:
           throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
@@ -340,7 +340,7 @@ public class RowStoreUtil {
     writer.startRow();
 
     for (int i = 0; i < writer.dataTypes().length; i++) {
-      if (tuple.isNull(i)) {
+      if (tuple.isBlankOrNull(i)) {
         writer.skipField();
         continue;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
index 564a9f5..44cd214 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.storage;
 
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
 
 import java.io.IOException;
@@ -27,8 +28,10 @@ import java.io.OutputStream;
 @Deprecated
 public interface SerializerDeserializer {
 
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException;
+  public void init(Schema schema);
 
-  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
+  public int serialize(int index, Tuple tuple, OutputStream out, byte[] nullCharacters) throws IOException;
+
+  public Datum deserialize(int index, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
index a2c08de..c101b0b 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -20,13 +20,13 @@ package org.apache.tajo.storage;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
 
 /**
  * This class is not thread-safe.
@@ -34,8 +34,8 @@ import org.apache.tajo.datum.NullDatum;
 public class TableStatistics {
   private static final Log LOG = LogFactory.getLog(TableStatistics.class);
   private Schema schema;
-  private Tuple minValues;
-  private Tuple maxValues;
+  private VTuple minValues;
+  private VTuple maxValues;
   private long [] numNulls;
   private long numRows = 0;
   private long numBytes = 0;
@@ -81,12 +81,17 @@ public class TableStatistics {
     return this.numBytes;
   }
 
-  public void analyzeField(int idx, Datum datum) {
-    if (datum instanceof NullDatum) {
+  public void analyzeNull(int idx) {
+    numNulls[idx]++;
+  }
+
+  public void analyzeField(int idx, Tuple tuple) {
+    if (tuple.isBlankOrNull(idx)) {
       numNulls[idx]++;
       return;
     }
 
+    Datum datum = tuple.asDatum(idx);
     if (comparable[idx]) {
       if (!maxValues.contains(idx) ||
           maxValues.get(idx).compareTo(datum) < 0) {
@@ -102,21 +107,21 @@ public class TableStatistics {
   public TableStats getTableStat() {
     TableStats stat = new TableStats();
 
-    ColumnStats columnStats;
     for (int i = 0; i < schema.size(); i++) {
-      columnStats = new ColumnStats(schema.getColumn(i));
+      Column column = schema.getColumn(i);
+      ColumnStats columnStats = new ColumnStats(column);
       columnStats.setNumNulls(numNulls[i]);
-      if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) {
+      if (minValues.isBlank(i) || column.getDataType().getType() == minValues.type(i)) {
         columnStats.setMinValue(minValues.get(i));
       } else {
-        LOG.warn("Wrong statistics column type (" + minValues.get(i).type() +
-            ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
+        LOG.warn("Wrong statistics column type (" + minValues.type(i) +
+            ", expected=" + column.getDataType().getType() + ")");
       }
-      if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) {
+      if (minValues.isBlank(i) || column.getDataType().getType() == maxValues.type(i)) {
         columnStats.setMaxValue(maxValues.get(i));
       } else {
-        LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() +
-            ", expected=" + schema.getColumn(i).getDataType().getType() + ")");
+        LOG.warn("Wrong statistics column type (" + maxValues.type(i) +
+            ", expected=" + column.getDataType().getType() + ")");
       }
       stat.addColumnStat(columnStats);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
index 954b62d..1ec13bc 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage;
 import com.google.protobuf.Message;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
@@ -39,39 +40,48 @@ public class TextSerializerDeserializer implements SerializerDeserializer {
   public static final byte[] falseBytes = "false".getBytes();
   private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
 
+  public TextSerializerDeserializer() {}
+
+  public TextSerializerDeserializer(Schema schema) {
+    init(schema);
+  }
+
+  private Schema schema;
+
   @Override
-  public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException {
+  public void init(Schema schema) {
+    this.schema = schema;
+  }
 
-    byte[] bytes;
-    int length = 0;
-    TajoDataTypes.DataType dataType = col.getDataType();
-
-    if (datum == null || datum instanceof NullDatum) {
-      switch (dataType.getType()) {
-        case CHAR:
-        case TEXT:
-          length = nullCharacters.length;
-          out.write(nullCharacters);
-          break;
-        default:
-          break;
+  @Override
+  public int serialize(int index, Tuple tuple, OutputStream out, byte[] nullCharacters)
+      throws IOException {
+
+    Column col = schema.getColumn(index);
+    TajoDataTypes.Type type = col.getDataType().getType();
+    if (tuple.isBlankOrNull(index)) {
+      if (type == TajoDataTypes.Type.CHAR || type == TajoDataTypes.Type.TEXT) {
+        out.write(nullCharacters);
+        return nullCharacters.length;
       }
-      return length;
+      return 0;
     }
 
-    switch (dataType.getType()) {
+    byte[] bytes;
+    int length = 0;
+    switch (type) {
       case BOOLEAN:
-        out.write(datum.asBool() ? trueBytes : falseBytes);
+        out.write(tuple.getBool(index) ? trueBytes : falseBytes);
         length = trueBytes.length;
         break;
       case CHAR:
-        int size = dataType.getLength() - datum.size();
+        int size = col.getDataType().getLength() - tuple.size(index);
         if (size < 0){
-          throw new ValueTooLongForTypeCharactersException(dataType.getLength());
+          throw new ValueTooLongForTypeCharactersException(col.getDataType().getLength());
         }
 
         byte[] pad = new byte[size];
-        bytes = datum.asTextBytes();
+        bytes = tuple.getTextBytes(index);
         out.write(bytes);
         out.write(pad);
         length = bytes.length + pad.length;
@@ -86,28 +96,28 @@ public class TextSerializerDeserializer implements SerializerDeserializer {
       case INET4:
       case DATE:
       case INTERVAL:
-        bytes = datum.asTextBytes();
+        bytes = tuple.getTextBytes(index);
         length = bytes.length;
         out.write(bytes);
         break;
       case TIME:
-        bytes = ((TimeDatum)datum).asChars(TimeZone.getDefault(), true).getBytes();
+        bytes = TimeDatum.asChars(tuple.getTimeDate(index), TimeZone.getDefault(), true).getBytes();
         length = bytes.length;
         out.write(bytes);
         break;
       case TIMESTAMP:
-        bytes = ((TimestampDatum)datum).asChars(TimeZone.getDefault(), true).getBytes();
+        bytes = TimestampDatum.asChars(tuple.getTimeDate(index), TimeZone.getDefault(), true).getBytes();
         length = bytes.length;
         out.write(bytes);
         break;
       case INET6:
       case BLOB:
-        bytes = Base64.encodeBase64(datum.asByteArray(), false);
+        bytes = Base64.encodeBase64(tuple.getBytes(index), false);
         length = bytes.length;
         out.write(bytes, 0, length);
         break;
       case PROTOBUF:
-        ProtobufDatum protobuf = (ProtobufDatum) datum;
+        ProtobufDatum protobuf = (ProtobufDatum) tuple.getProtobufDatum(index);
         byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes();
         length = protoBytes.length;
         out.write(protoBytes, 0, protoBytes.length);
@@ -120,10 +130,13 @@ public class TextSerializerDeserializer implements SerializerDeserializer {
   }
 
   @Override
-  public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+  public Datum deserialize(int index, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException {
+
+    Column col = schema.getColumn(index);
+    TajoDataTypes.Type type = col.getDataType().getType();
 
     Datum datum;
-    switch (col.getDataType().getType()) {
+    switch (type) {
       case BOOLEAN:
         datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get()
             : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T');

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
index 043409a..c42cdd6 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -22,8 +22,6 @@ import com.google.common.base.Objects;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 
-import java.util.Comparator;
-
 /**
  * It represents a pair of start and end tuples.
  */

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
index 33f9f1c..1f43ef8 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
@@ -21,6 +21,7 @@ package org.apache.tajo.tuple.offheap;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.Tuple;
@@ -29,6 +30,7 @@ import org.apache.tajo.util.SizeOf;
 import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.UnsafeUtil;
 
+import org.apache.tajo.util.datetime.TimeMeta;
 import sun.misc.Unsafe;
 
 import java.nio.ByteBuffer;
@@ -53,6 +55,16 @@ public class HeapTuple implements Tuple {
     return data.length;
   }
 
+  @Override
+  public TajoDataTypes.Type type(int fieldId) {
+    return types[fieldId].getType();
+  }
+
+  @Override
+  public int size(int fieldId) {
+    return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
   public ByteBuffer nioBuffer() {
     return ByteBuffer.wrap(data);
   }
@@ -75,13 +87,18 @@ public class HeapTuple implements Tuple {
   }
 
   @Override
-  public boolean isNull(int fieldid) {
+  public boolean isBlank(int fieldid) {
     return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
   }
 
   @Override
-  public boolean isNotNull(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  public boolean isBlankOrNull(int fieldid) {
+    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
   }
 
   @Override
@@ -95,23 +112,13 @@ public class HeapTuple implements Tuple {
   }
 
   @Override
-  public void put(int fieldId, Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
-  }
-
-  @Override
   public void put(Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+    throw new UnsupportedException("UnSafeTuple does not support put(Datum[]).");
   }
 
   @Override
-  public Datum get(int fieldId) {
-    if (isNull(fieldId)) {
+  public Datum asDatum(int fieldId) {
+    if (isBlankOrNull(fieldId)) {
       return NullDatum.get();
     }
 
@@ -184,6 +191,11 @@ public class HeapTuple implements Tuple {
   }
 
   @Override
+  public byte[] getTextBytes(int fieldId) {
+    return getText(fieldId).getBytes();
+  }
+
+  @Override
   public short getInt2(int fieldId) {
     return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
   }
@@ -213,6 +225,11 @@ public class HeapTuple implements Tuple {
     return new String(getBytes(fieldId));
   }
 
+  @Override
+  public TimeMeta getTimeDate(int fieldId) {
+    return asDatum(fieldId).asTimeMeta();
+  }
+
   public IntervalDatum getInterval(int fieldId) {
     long pos = checkNullAndGetOffset(fieldId);
     int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
@@ -257,7 +274,7 @@ public class HeapTuple implements Tuple {
     Datum [] datums = new Datum[size()];
     for (int i = 0; i < size(); i++) {
       if (contains(i)) {
-        datums[i] = get(i);
+        datums[i] = asDatum(i);
       } else {
         datums[i] = NullDatum.get();
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
index b742e6d..e7bd2aa 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.google.protobuf.Message;
 
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.*;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.Tuple;
@@ -30,6 +31,7 @@ import org.apache.tajo.util.SizeOf;
 import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.UnsafeUtil;
 
+import org.apache.tajo.util.datetime.TimeMeta;
 import sun.misc.Unsafe;
 import sun.nio.ch.DirectBuffer;
 
@@ -63,6 +65,16 @@ public abstract class UnSafeTuple implements Tuple {
     return types.length;
   }
 
+  @Override
+  public TajoDataTypes.Type type(int fieldId) {
+    return types[fieldId].getType();
+  }
+
+  @Override
+  public int size(int fieldId) {
+    return UNSAFE.getInt(getFieldAddr(fieldId));
+  }
+
   public ByteBuffer nioBuffer() {
     return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
   }
@@ -105,13 +117,13 @@ public abstract class UnSafeTuple implements Tuple {
   }
 
   @Override
-  public boolean isNull(int fieldid) {
+  public boolean isBlank(int fieldid) {
     return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
   }
 
   @Override
-  public boolean isNotNull(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  public boolean isBlankOrNull(int fieldid) {
+    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
   }
 
   @Override
@@ -125,23 +137,18 @@ public abstract class UnSafeTuple implements Tuple {
   }
 
   @Override
-  public void put(int fieldId, Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
-  }
-
-  @Override
   public void put(int fieldId, Tuple tuple) {
     throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
   }
 
   @Override
   public void put(Datum[] values) {
-    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+    throw new UnsupportedException("UnSafeTuple does not support put(Datum[]).");
   }
 
   @Override
-  public Datum get(int fieldId) {
-    if (isNull(fieldId)) {
+  public Datum asDatum(int fieldId) {
+    if (isBlankOrNull(fieldId)) {
       return NullDatum.get();
     }
 
@@ -214,6 +221,17 @@ public abstract class UnSafeTuple implements Tuple {
   }
 
   @Override
+  public byte[] getTextBytes(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte[] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return bytes;
+  }
+
+  @Override
   public short getInt2(int fieldId) {
     long addr = getFieldAddr(fieldId);
     return UNSAFE.getShort(addr);
@@ -241,13 +259,7 @@ public abstract class UnSafeTuple implements Tuple {
 
   @Override
   public String getText(int fieldId) {
-    long pos = getFieldAddr(fieldId);
-    int len = UNSAFE.getInt(pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return new String(bytes);
+    return new String(getTextBytes(fieldId));
   }
 
   public IntervalDatum getInterval(int fieldId) {
@@ -285,6 +297,11 @@ public abstract class UnSafeTuple implements Tuple {
   }
 
   @Override
+  public TimeMeta getTimeDate(int fieldId) {
+    return null;
+  }
+
+  @Override
   public Tuple clone() throws CloneNotSupportedException {
     return toHeapTuple();
   }
@@ -294,7 +311,7 @@ public abstract class UnSafeTuple implements Tuple {
     Datum [] datums = new Datum[size()];
     for (int i = 0; i < size(); i++) {
       if (contains(i)) {
-        datums[i] = get(i);
+        datums[i] = asDatum(i);
       } else {
         datums[i] = NullDatum.get();
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
index 0251dc7..d0ee8e0 100644
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
@@ -33,8 +33,7 @@ public class TestFrameTuple {
 
   @Before
   public void setUp() throws Exception {
-    tuple1 = new VTuple(11);
-    tuple1.put(new Datum[] {
+    tuple1 = new VTuple(new Datum[] {
         DatumFactory.createBool(true),
         DatumFactory.createBit((byte) 0x99),
         DatumFactory.createChar('9'),
@@ -48,8 +47,7 @@ public class TestFrameTuple {
         DatumFactory.createInet4("192.168.0.1")
     });
     
-    tuple2 = new VTuple(11);
-    tuple2.put(new Datum[] {
+    tuple2 = new VTuple(new Datum[] {
         DatumFactory.createBool(true),
         DatumFactory.createBit((byte) 0x99),
         DatumFactory.createChar('9'),
@@ -76,9 +74,9 @@ public class TestFrameTuple {
       assertTrue(frame.contains(i));
     }
     
-    assertEquals(DatumFactory.createInt8(23l), frame.get(5));
-    assertEquals(DatumFactory.createInt8(23l), frame.get(16));
-    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
-    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
+    assertEquals(23l, frame.getInt8(5));
+    assertEquals(23l, frame.getInt8(16));
+    assertEquals("192.168.0.1", frame.getText(10));
+    assertEquals("192.168.0.1", frame.getText(21));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
index 9e7f334..96f90e7 100644
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -71,6 +71,7 @@ public class TestLazyTuple {
     sb.append(NullDatum.get());
     textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|', 13);
     serde = new TextSerializerDeserializer();
+    serde.init(schema);
   }
 
   @Test
@@ -194,31 +195,6 @@ public class TestLazyTuple {
   }
 
   @Test
-  public void testPutTuple() {
-    int colNum = schema.size();
-    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
-
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(2, DatumFactory.createInt4(3));
-
-
-    Schema schema2 = new Schema();
-    schema2.addColumn("col1", TajoDataTypes.Type.INT8);
-    schema2.addColumn("col2", TajoDataTypes.Type.INT8);
-
-    LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1);
-    t2.put(0, DatumFactory.createInt4(4));
-    t2.put(1, DatumFactory.createInt4(5));
-
-    t1.put(3, t2);
-
-    for (int i = 0; i < 5; i++) {
-      assertEquals(i + 1, t1.get(i).asInt4());
-    }
-  }
-
-  @Test
   public void testInvalidNumber() {
     byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|', 5);
     Schema schema = new Schema();

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
index 5e94531..9eec96f 100644
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
@@ -51,17 +51,14 @@ public class TestTupleComparator {
     schema.addColumn("col4", Type.INT4);
     schema.addColumn("col5", Type.TEXT);
     
-    Tuple tuple1 = new VTuple(5);
-    Tuple tuple2 = new VTuple(5);
-
-    tuple1.put(
+    VTuple tuple1 = new VTuple(
         new Datum[] {
         DatumFactory.createInt4(9),
         DatumFactory.createInt4(3),
         DatumFactory.createInt4(33),
         DatumFactory.createInt4(4),
         DatumFactory.createText("abc")});
-    tuple2.put(
+    VTuple tuple2 = new VTuple(
         new Datum[] {
         DatumFactory.createInt4(1),
         DatumFactory.createInt4(25),

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
index 1bbd9ec..4ef595c 100644
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
@@ -121,34 +121,15 @@ public class TestVTuple {
 	}
 
   @Test
-  public void testPutTuple() {
-    Tuple t1 = new VTuple(5);
-
-    t1.put(0, DatumFactory.createInt4(1));
-    t1.put(1, DatumFactory.createInt4(2));
-    t1.put(2, DatumFactory.createInt4(3));
-
-    Tuple t2 = new VTuple(2);
-    t2.put(0, DatumFactory.createInt4(4));
-    t2.put(1, DatumFactory.createInt4(5));
-
-    t1.put(3, t2);
-
-    for (int i = 0; i < 5; i++) {
-      assertEquals(i+1, t1.get(i).asInt4());
-    }
-  }
-
-  @Test
   public void testClone() throws CloneNotSupportedException {
-    Tuple t1 = new VTuple(5);
+    VTuple t1 = new VTuple(5);
 
     t1.put(0, DatumFactory.createInt4(1));
     t1.put(1, DatumFactory.createInt4(2));
     t1.put(3, DatumFactory.createInt4(2));
     t1.put(4, DatumFactory.createText("str"));
 
-    VTuple t2 = (VTuple) t1.clone();
+    VTuple t2 = t1.clone();
     assertNotSame(t1, t2);
     assertEquals(t1, t2);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
index c43ba38..278d733 100644
--- a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
@@ -497,79 +497,79 @@ public class TestOffHeapRowBlock {
 
   public static void validateNullity(int j, Tuple tuple) {
     if (j == 0) {
-      tuple.isNull(0);
+      tuple.isBlankOrNull(0);
     } else {
       assertTrue((j % 1 == 0) == tuple.getBool(0));
     }
 
     if (j % 1 == 0) {
-      tuple.isNull(1);
+      tuple.isBlankOrNull(1);
     } else {
       assertTrue(1 == tuple.getInt2(1));
     }
 
     if (j % 2 == 0) {
-      tuple.isNull(2);
+      tuple.isBlankOrNull(2);
     } else {
       assertEquals(j, tuple.getInt4(2));
     }
 
     if (j % 3 == 0) {
-      tuple.isNull(3);
+      tuple.isBlankOrNull(3);
     } else {
       assertEquals(j, tuple.getInt8(3));
     }
 
     if (j % 4 == 0) {
-      tuple.isNull(4);
+      tuple.isBlankOrNull(4);
     } else {
       assertTrue(j == tuple.getFloat4(4));
     }
 
     if (j % 5 == 0) {
-      tuple.isNull(5);
+      tuple.isBlankOrNull(5);
     } else {
       assertTrue(j == tuple.getFloat8(5));
     }
 
     if (j % 6 == 0) {
-      tuple.isNull(6);
+      tuple.isBlankOrNull(6);
     } else {
       assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6));
     }
 
     if (j % 7 == 0) {
-      tuple.isNull(7);
+      tuple.isBlankOrNull(7);
     } else {
       assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7));
     }
 
     if (j % 8 == 0) {
-      tuple.isNull(8);
+      tuple.isBlankOrNull(8);
     } else {
       assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8));
     }
 
     if (j % 9 == 0) {
-      tuple.isNull(9);
+      tuple.isBlankOrNull(9);
     } else {
       assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9));
     }
 
     if (j % 10 == 0) {
-      tuple.isNull(10);
+      tuple.isBlankOrNull(10);
     } else {
       assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10));
     }
 
     if (j % 11 == 0) {
-      tuple.isNull(11);
+      tuple.isBlankOrNull(11);
     } else {
       assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11));
     }
 
     if (j % 12 == 0) {
-      tuple.isNull(12);
+      tuple.isBlankOrNull(12);
     } else {
       assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12));
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
index 0e3441b..425f392 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -151,11 +151,10 @@ public abstract class AbstractHBaseAppender implements Appender {
     if (rowkeyColumnIndexes.length > 1) {
       bout.reset();
       for (int i = 0; i < rowkeyColumnIndexes.length; i++) {
-        datum = tuple.get(rowkeyColumnIndexes[i]);
         if (isBinaryColumns[rowkeyColumnIndexes[i]]) {
-          rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+          rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), tuple, i);
         } else {
-          rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+          rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), tuple, i);
         }
         bout.write(rowkey);
         if (i < rowkeyColumnIndexes.length - 1) {
@@ -165,11 +164,10 @@ public abstract class AbstractHBaseAppender implements Appender {
       rowkey = bout.toByteArray();
     } else {
       int index = rowkeyColumnIndexes[0];
-      datum = tuple.get(index);
       if (isBinaryColumns[index]) {
-        rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum);
+        rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), tuple, index);
       } else {
-        rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum);
+        rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), tuple, index);
       }
     }
 
@@ -182,12 +180,11 @@ public abstract class AbstractHBaseAppender implements Appender {
       if (isRowKeyMappings[i]) {
         continue;
       }
-      Datum datum = tuple.get(i);
       byte[] value;
       if (isBinaryColumns[i]) {
-        value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+        value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), tuple, i);
       } else {
-        value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
+        value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), tuple, i);
       }
 
       if (isColumnKeys[i]) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
index 53ff9dc..40c4aea 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
@@ -22,6 +22,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.Bytes;
 
 import java.io.IOException;
@@ -98,4 +99,38 @@ public class HBaseBinarySerializerDeserializer {
 
     return bytes;
   }
+
+  public static byte[] serialize(Column col, Tuple tuple, int index) throws IOException {
+    if (tuple.isBlankOrNull(index)) {
+      return null;
+    }
+
+    byte[] bytes;
+    switch (col.getDataType().getType()) {
+      case INT1:
+      case INT2:
+        bytes = Bytes.toBytes(tuple.getInt2(index));
+        break;
+      case INT4:
+        bytes = Bytes.toBytes(tuple.getInt4(index));
+        break;
+      case INT8:
+        bytes = Bytes.toBytes(tuple.getInt8(index));
+        break;
+      case FLOAT4:
+        bytes = Bytes.toBytes(tuple.getFloat4(index));
+        break;
+      case FLOAT8:
+        bytes = Bytes.toBytes(tuple.getFloat8(index));
+        break;
+      case TEXT:
+        bytes = Bytes.toBytes(tuple.getText(index));
+        break;
+      default:
+        bytes = null;
+        break;
+    }
+
+    return bytes;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index b1a2d59..19fdf80 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -27,7 +27,6 @@ import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
 import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.storage.Tuple;
 
@@ -65,12 +64,11 @@ public class HBasePutAppender extends AbstractHBaseAppender {
       if (isRowKeyMappings[i]) {
         continue;
       }
-      Datum datum = tuple.get(i);
       byte[] value;
       if (isBinaryColumns[i]) {
-        value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+        value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), tuple, i);
       } else {
-        value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
+        value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), tuple, i);
       }
 
       if (isColumnKeys[i]) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 670f87e..845c2d7 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -1008,7 +1008,7 @@ public class HBaseTablespace extends Tablespace {
         Tuple previousTuple = dataRange.getStart();
 
         for (byte[] eachEndKey : endKeys) {
-          Tuple endTuple = new VTuple(sortSpecs.length);
+          VTuple endTuple = new VTuple(sortSpecs.length);
           byte[][] rowKeyFields;
           if (sortSpecs.length > 1) {
             byte[][] splitValues = BytesUtils.splitPreserveAllTokens(

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
index a0ad492..ea5d0b0 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java
@@ -22,6 +22,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.NumberUtil;
 
 import java.io.IOException;
@@ -68,4 +69,12 @@ public class HBaseTextSerializerDeserializer {
 
     return datum.asChars().getBytes();
   }
+
+  public static byte[] serialize(Column col, Tuple tuple, int index) throws IOException {
+    if (tuple.isBlankOrNull(index)) {
+      return null;
+    }
+
+    return tuple.getBytes(index);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
index 5fc96f1..ee3095c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -33,7 +33,6 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.compress.CodecPool;
@@ -149,6 +148,7 @@ public class CSVFile {
         String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
             TextSerializerDeserializer.class.getName());
         serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+        serde.init(schema);
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         throw new IOException(e);
@@ -163,12 +163,10 @@ public class CSVFile {
 
     @Override
     public void addTuple(Tuple tuple) throws IOException {
-      Datum datum;
       int rowBytes = 0;
 
       for (int i = 0; i < columnNum; i++) {
-        datum = tuple.get(i);
-        rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars);
+        rowBytes += serde.serialize(i, tuple, os, nullChars);
 
         if(columnNum - 1 > i){
           os.write(delimiter);
@@ -176,7 +174,7 @@ public class CSVFile {
         }
         if (isShuffle) {
           // it is to calculate min/max values, and it is only used for the intermediate file.
-          stats.analyzeField(i, datum);
+          stats.analyzeField(i, tuple);
         }
       }
       os.write(LF);
@@ -358,6 +356,7 @@ public class CSVFile {
         String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE,
             TextSerializerDeserializer.class.getName());
         serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+        serde.init(schema);
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
index 0b3755d..cfd5a79 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.tajo.storage;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.storage.text.TextLineParsingError;
 
@@ -29,9 +30,11 @@ import java.io.OutputStream;
 
 public interface FieldSerializerDeserializer {
 
-  public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException;
+  void init(Schema schema);
 
-  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars)
+  int serialize(int columnIndex, Tuple datum, OutputStream out, byte[] nullChars) throws IOException;
+
+  Datum deserialize(int columnIndex, ByteBuf buf, ByteBuf nullChars)
       throws IOException, TextLineParsingError;
 
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index 5213ba0..4e9bcda 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -634,10 +634,10 @@ public class RawFile {
       nullFlags.clear();
       for (int i = 0; i < schema.size(); i++) {
         if (enabledStats) {
-          stats.analyzeField(i, t.get(i));
+          stats.analyzeField(i, t);
         }
 
-        if (t.isNull(i)) {
+        if (t.isBlankOrNull(i)) {
           nullFlags.set(i);
           continue;
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
index 1ff6c4f..0e628d4 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -376,56 +376,56 @@ public class RowFile {
 
       for (int i = 0; i < schema.size(); i++) {
         if (enabledStats) {
-          stats.analyzeField(i, t.get(i));
+          stats.analyzeField(i, t);
         }
 
-        if (t.isNull(i)) {
+        if (t.isBlankOrNull(i)) {
           nullFlags.set(i);
         } else {
           col = schema.getColumn(i);
           switch (col.getDataType().getType()) {
             case BOOLEAN:
-              buffer.put(t.get(i).asByte());
+              buffer.put(t.getByte(i));
               break;
             case BIT:
-              buffer.put(t.get(i).asByte());
+              buffer.put(t.getByte(i));
               break;
             case CHAR:
-              byte[] src = t.get(i).asByteArray();
+              byte[] src = t.getBytes(i);
               byte[] dst = Arrays.copyOf(src, col.getDataType().getLength());
               buffer.putInt(src.length);
               buffer.put(dst);
               break;
             case TEXT:
-              byte [] strbytes = t.get(i).asByteArray();
+              byte [] strbytes = t.getBytes(i);
               buffer.putShort((short)strbytes.length);
               buffer.put(strbytes, 0, strbytes.length);
               break;
             case INT2:
-              buffer.putShort(t.get(i).asInt2());
+              buffer.putShort(t.getInt2(i));
               break;
             case INT4:
-              buffer.putInt(t.get(i).asInt4());
+              buffer.putInt(t.getInt4(i));
               break;
             case INT8:
-              buffer.putLong(t.get(i).asInt8());
+              buffer.putLong(t.getInt8(i));
               break;
             case FLOAT4:
-              buffer.putFloat(t.get(i).asFloat4());
+              buffer.putFloat(t.getFloat4(i));
               break;
             case FLOAT8:
-              buffer.putDouble(t.get(i).asFloat8());
+              buffer.putDouble(t.getFloat8(i));
               break;
             case BLOB:
-              byte [] bytes = t.get(i).asByteArray();
+              byte [] bytes = t.getBytes(i);
               buffer.putShort((short)bytes.length);
               buffer.put(bytes);
               break;
             case INET4:
-              buffer.put(t.get(i).asByteArray());
+              buffer.put(t.getBytes(i));
               break;
             case INET6:
-              buffer.put(t.get(i).asByteArray());
+              buffer.put(t.getBytes(i));
               break;
             case NULL_TYPE:
               nullFlags.set(i);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index da426ea..2782955 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -33,7 +33,6 @@ import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.FileAppender;
 import org.apache.tajo.storage.TableStatistics;
 import org.apache.tajo.storage.Tuple;
@@ -104,7 +103,7 @@ public class AvroAppender extends FileAppender {
   }
 
   private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
-    if (tuple.get(i) instanceof NullDatum) {
+    if (tuple.isBlankOrNull(i)) {
       return null;
     }
     switch (avroType) {
@@ -141,7 +140,7 @@ public class AvroAppender extends FileAppender {
     for (int i = 0; i < schema.size(); ++i) {
       Column column = schema.getColumn(i);
       if (enabledStats) {
-        stats.analyzeField(i, tuple.get(i));
+        stats.analyzeField(i, tuple);
       }
       Object value;
       Schema.Field avroField = avroFields.get(i);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
index 34e9661..60c32a7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
@@ -56,7 +56,7 @@ public class JsonLineSerializer extends TextLineSerializer {
     JSONObject jsonObject = new JSONObject();
 
     for (int i = 0; i < columnNum; i++) {
-      if (input.isNull(i)) {
+      if (input.isBlankOrNull(i)) {
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index 415c338..45960aa 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -108,7 +108,7 @@ public class ParquetAppender extends FileAppender {
   public void addTuple(Tuple tuple) throws IOException {
     if (enabledStats) {
       for (int i = 0; i < schema.size(); ++i) {
-        stats.analyzeField(i, tuple.get(i));
+        stats.analyzeField(i, tuple);
       }
     }
     writer.write(tuple);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 4c675a4..7f236b6 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -158,7 +158,7 @@ public class TajoRecordConverter extends GroupConverter {
       final int projectionIndex = projectionMap[i];
       Column column = tajoReadSchema.getColumn(projectionIndex);
       if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE
-          || currentTuple.get(i) == null) {
+          || currentTuple.isBlankOrNull(i)) {
         set(projectionIndex, NullDatum.get());
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
index dd951e1..de2a1e3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.Datum;
 import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
 import org.apache.tajo.storage.Tuple;
 import parquet.hadoop.api.WriteSupport;
@@ -99,11 +98,10 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
       if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
         continue;
       }
-      Datum datum = tuple.get(tajoIndex);
       Type fieldType = fields.get(index);
-      if (!tuple.isNull(tajoIndex)) {
+      if (!tuple.isBlankOrNull(tajoIndex)) {
         recordConsumer.startField(fieldType.getName(), index);
-        writeValue(fieldType, column, datum);
+        writeValue(fieldType, column, tuple, tajoIndex);
         recordConsumer.endField(fieldType.getName(), index);
       } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
         throw new RuntimeException("Null-value for required field: " +
@@ -113,40 +111,40 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
     }
   }
 
-  private void writeValue(Type fieldType, Column column, Datum datum) {
+  private void writeValue(Type fieldType, Column column, Tuple tuple, int index) {
     switch (column.getDataType().getType()) {
       case BOOLEAN:
-        recordConsumer.addBoolean(datum.asBool());
+        recordConsumer.addBoolean(tuple.getBool(index));
         break;
       case BIT:
       case INT2:
       case INT4:
-        recordConsumer.addInteger(datum.asInt4());
+        recordConsumer.addInteger(tuple.getInt4(index));
         break;
       case INT8:
-        recordConsumer.addLong(datum.asInt8());
+        recordConsumer.addLong(tuple.getInt8(index));
         break;
       case FLOAT4:
-        recordConsumer.addFloat(datum.asFloat4());
+        recordConsumer.addFloat(tuple.getFloat4(index));
         break;
       case FLOAT8:
-        recordConsumer.addDouble(datum.asFloat8());
+        recordConsumer.addDouble(tuple.getFloat8(index));
         break;
       case CHAR:
-        if (datum.size() > column.getDataType().getLength()) {
+        if (tuple.size(index) > column.getDataType().getLength()) {
           throw new ValueTooLongForTypeCharactersException(column.getDataType().getLength());
         }
 
-        recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes()));
+        recordConsumer.addBinary(Binary.fromByteArray(tuple.getTextBytes(index)));
         break;
       case TEXT:
-        recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes()));
+        recordConsumer.addBinary(Binary.fromByteArray(tuple.getBytes(index)));
         break;
       case PROTOBUF:
       case BLOB:
       case INET4:
       case INET6:
-        recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray()));
+        recordConsumer.addBinary(Binary.fromByteArray(tuple.getBytes(index)));
         break;
       default:
         break;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index af260b4..1dcec5f 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.SequenceFile.Metadata;
 import org.apache.hadoop.io.compress.*;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -646,8 +645,12 @@ public class RCFile {
         valLenBuffer = new NonSyncByteArrayOutputStream();
       }
 
-      public int append(Column column, Datum datum) throws IOException {
-        int currentLen = serde.serialize(column, datum, columnValBuffer, nullChars);
+      public int append(NullDatum nill) {
+        return nullChars.length;
+      }
+
+      public int append(Tuple tuple, int i) throws IOException {
+        int currentLen = serde.serialize(i, tuple, columnValBuffer, nullChars);
         columnValueLength += currentLen;
         uncompressedColumnValueLength += currentLen;
 
@@ -765,6 +768,7 @@ public class RCFile {
           BinarySerializerDeserializer.class.getName());
       try {
         serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+        serde.init(schema);
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         throw new IOException(e);
@@ -892,20 +896,19 @@ public class RCFile {
       int size = schema.size();
 
       for (int i = 0; i < size; i++) {
-        Datum datum = tuple.get(i);
-        int length = columnBuffers[i].append(schema.getColumn(i), datum);
+        int length = columnBuffers[i].append(tuple, i);
         columnBufferSize += length;
         if (isShuffle) {
           // it is to calculate min/max values, and it is only used for the intermediate file.
-          stats.analyzeField(i, datum);
+          stats.analyzeField(i, tuple);
         }
       }
 
       if (size < columnNumber) {
         for (int i = size; i < columnNumber; i++) {
-          columnBuffers[i].append(schema.getColumn(i), NullDatum.get());
+          columnBuffers[i].append(NullDatum.get());
           if (isShuffle) {
-            stats.analyzeField(i, NullDatum.get());
+            stats.analyzeNull(i);
           }
         }
       }
@@ -1377,6 +1380,7 @@ public class RCFile {
           serdeClass = this.meta.getOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
         }
         serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+        serde.init(schema);
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         throw new IOException(e);
@@ -1712,7 +1716,7 @@ public class RCFile {
         } else {
           colAdvanceRow(j, col);
 
-          Datum datum = serde.deserialize(schema.getColumn(actualColumnIdx),
+          Datum datum = serde.deserialize(actualColumnIdx,
               currentValue.loadedColumnsValueBuffer[j].getData(), col.rowReadIndex, col.prvLength, nullChars);
           tuple.put(j, datum);
           col.rowReadIndex += col.prvLength;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
index 404352c..9b09d78 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
@@ -35,7 +35,6 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.storage.*;
@@ -125,6 +124,7 @@ public class SequenceFileAppender extends FileAppender {
       String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE,
           TextSerializerDeserializer.class.getName());
       serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+      serde.init(schema);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
       throw new IOException(e);
@@ -159,16 +159,13 @@ public class SequenceFileAppender extends FileAppender {
 
   @Override
   public void addTuple(Tuple tuple) throws IOException {
-    Datum datum;
 
     if (serde instanceof BinarySerializerDeserializer) {
       byte nullByte = 0;
       int lasti = 0;
       for (int i = 0; i < columnNum; i++) {
-        datum = tuple.get(i);
-
         // set bit to 1 if a field is not null
-        if (null != datum) {
+        if (!tuple.isBlank(i)) {
           nullByte |= 1 << (i % 8);
         }
 
@@ -179,29 +176,28 @@ public class SequenceFileAppender extends FileAppender {
           os.write(nullByte);
 
           for (int j = lasti; j <= i; j++) {
-            datum = tuple.get(j);
 
             switch (schema.getColumn(j).getDataType().getType()) {
               case TEXT:
-                BytesUtils.writeVLong(os, datum.asTextBytes().length);
+                BytesUtils.writeVLong(os, tuple.getTextBytes(j).length);
                 break;
               case PROTOBUF:
-                ProtobufDatum protobufDatum = (ProtobufDatum) datum;
+                ProtobufDatum protobufDatum = (ProtobufDatum) tuple.getProtobufDatum(j);
                 BytesUtils.writeVLong(os, protobufDatum.asByteArray().length);
                 break;
               case CHAR:
               case INET4:
               case BLOB:
-                BytesUtils.writeVLong(os, datum.asByteArray().length);
+                BytesUtils.writeVLong(os, tuple.getBytes(j).length);
                 break;
               default:
             }
 
-            serde.serialize(schema.getColumn(j), datum, os, nullChars);
+            serde.serialize(j, tuple, os, nullChars);
 
             if (isShuffle) {
               // it is to calculate min/max values, and it is only used for the intermediate file.
-              stats.analyzeField(j, datum);
+              stats.analyzeField(j, tuple);
             }
           }
           lasti = i + 1;
@@ -215,8 +211,7 @@ public class SequenceFileAppender extends FileAppender {
 
     } else {
       for (int i = 0; i < columnNum; i++) {
-        datum = tuple.get(i);
-        serde.serialize(schema.getColumn(i), datum, os, nullChars);
+        serde.serialize(i, tuple, os, nullChars);
 
         if (columnNum -1 > i) {
           os.write((byte) delimiter);
@@ -224,7 +219,7 @@ public class SequenceFileAppender extends FileAppender {
 
         if (isShuffle) {
           // it is to calculate min/max values, and it is only used for the intermediate file.
-          stats.analyzeField(i, datum);
+          stats.analyzeField(i, tuple);
         }
 
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
index af0973e..ff73a1c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
@@ -117,6 +117,7 @@ public class SequenceFileScanner extends FileScanner {
     try {
       String serdeClass = this.meta.getOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
       serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance();
+      serde.init(schema);
 
       if (serde instanceof BinarySerializerDeserializer) {
         hasBinarySerDe = true;
@@ -225,7 +226,7 @@ public class SequenceFileScanner extends FileScanner {
 
         for (int j = 0; j < projectionMap.length; j++) {
           if (projectionMap[j] == i) {
-            Datum datum = serde.deserialize(schema.getColumn(i), bytes, fieldStart[i], fieldLength[i], nullChars);
+            Datum datum = serde.deserialize(i, bytes, fieldStart[i], fieldLength[i], nullChars);
             tuple.put(i, datum);
           }
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c2aee23/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 0901c0b..eabab22 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -19,8 +19,8 @@
 package org.apache.tajo.storage.text;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.tajo.catalog.Column;
 import io.netty.buffer.ByteBufProcessor;
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.Datum;
@@ -38,12 +38,10 @@ public class CSVLineDeserializer extends TextLineDeserializer {
   private int delimiterCompensation;
 
   private int [] targetColumnIndexes;
-  private Column [] projected;
 
   public CSVLineDeserializer(Schema schema, TableMeta meta, Column [] projected) {
     super(schema, meta);
 
-    this.projected = projected;
     targetColumnIndexes = new int[projected.length];
     for (int i = 0; i < projected.length; i++) {
       targetColumnIndexes[i] = schema.getColumnId(projected[i].getQualifiedName());
@@ -67,13 +65,14 @@ public class CSVLineDeserializer extends TextLineDeserializer {
     nullChars = TextLineSerDe.getNullChars(meta);
 
     fieldSerDer = new TextFieldSerializerDeserializer(meta);
+    fieldSerDer.init(schema);
   }
 
   public void deserialize(final ByteBuf lineBuf, Tuple output) throws IOException, TextLineParsingError {
-    int[] projection = targetColumnIndexes;
     if (lineBuf == null || targetColumnIndexes == null || targetColumnIndexes.length == 0) {
       return;
     }
+    int[] projection = targetColumnIndexes;
 
     final int rowLength = lineBuf.readableBytes();
     int start = 0, fieldLength = 0, end = 0;
@@ -93,14 +92,12 @@ public class CSVLineDeserializer extends TextLineDeserializer {
 
       if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
         lineBuf.setIndex(start, start + fieldLength);
-
         try {
-          Datum datum = fieldSerDer.deserialize(lineBuf, projected[currentTarget], currentIndex, nullChars);
+          Datum datum = fieldSerDer.deserialize(currentIndex, lineBuf, nullChars);
           output.put(currentTarget, datum);
         } catch (Exception e) {
           output.put(currentTarget, NullDatum.get());
         }
-
         currentTarget++;
       }