You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/09/16 13:06:10 UTC

[3/6] TAJO-907: Implement off-heap tuple block and zero-copy tuple.

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
new file mode 100644
index 0000000..c3f77e7
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/BaseTupleComparator.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.datum.Datum;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
+import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
+
+/**
+ * The Comparator class for Tuples
+ * 
+ * @see Tuple
+ */
+public class BaseTupleComparator extends TupleComparator implements ProtoObject<TupleComparatorProto> {
+  private final Schema schema;
+  private final SortSpec [] sortSpecs;
+  private final int[] sortKeyIds;
+  private final boolean[] asc;
+  @SuppressWarnings("unused")
+  private final boolean[] nullFirsts;  
+
+  private Datum left;
+  private Datum right;
+  private int compVal;
+
+  /**
+   * @param schema The schema of input tuples
+   * @param sortKeys The description of sort keys
+   */
+  public BaseTupleComparator(Schema schema, SortSpec[] sortKeys) {
+    Preconditions.checkArgument(sortKeys.length > 0, 
+        "At least one sort key must be specified.");
+
+    this.schema = schema;
+    this.sortSpecs = sortKeys;
+    this.sortKeyIds = new int[sortKeys.length];
+    this.asc = new boolean[sortKeys.length];
+    this.nullFirsts = new boolean[sortKeys.length];
+    for (int i = 0; i < sortKeys.length; i++) {
+      if (sortKeys[i].getSortKey().hasQualifier()) {
+        this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
+      } else {
+        this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
+      }
+          
+      this.asc[i] = sortKeys[i].isAscending();
+      this.nullFirsts[i]= sortKeys[i].isNullFirst();
+    }
+  }
+
+  public BaseTupleComparator(TupleComparatorProto proto) {
+    this.schema = new Schema(proto.getSchema());
+
+    this.sortSpecs = new SortSpec[proto.getSortSpecsCount()];
+    for (int i = 0; i < proto.getSortSpecsCount(); i++) {
+      sortSpecs[i] = new SortSpec(proto.getSortSpecs(i));
+    }
+
+    this.sortKeyIds = new int[proto.getCompSpecsCount()];
+    this.asc = new boolean[proto.getCompSpecsCount()];
+    this.nullFirsts = new boolean[proto.getCompSpecsCount()];
+
+    for (int i = 0; i < proto.getCompSpecsCount(); i++) {
+      TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
+      sortKeyIds[i] = sortSepcProto.getColumnId();
+      asc[i] = sortSepcProto.getAscending();
+      nullFirsts[i] = sortSepcProto.getNullFirst();
+    }
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public SortSpec [] getSortSpecs() {
+    return sortSpecs;
+  }
+
+  public int [] getSortKeyIds() {
+    return sortKeyIds;
+  }
+
+  public boolean isAscendingFirstKey() {
+    return this.asc[0];
+  }
+
+  @Override
+  public int compare(Tuple tuple1, Tuple tuple2) {
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      left = tuple1.get(sortKeyIds[i]);
+      right = tuple2.get(sortKeyIds[i]);
+
+      if (left.isNull() || right.isNull()) {
+        if (!left.equals(right)) {
+          if (left.isNull()) {
+            compVal = 1;
+          } else if (right.isNull()) {
+            compVal = -1;
+          }
+          if (nullFirsts[i]) {
+            if (compVal != 0) {
+              compVal *= -1;
+            }
+          }
+        } else {
+          compVal = 0;
+        }
+      } else {
+        if (asc[i]) {
+          compVal = left.compareTo(right);
+        } else {
+          compVal = right.compareTo(left);
+        }
+      }
+
+      if (compVal < 0 || compVal > 0) {
+        return compVal;
+      }
+    }
+    return 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(sortKeyIds);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof BaseTupleComparator) {
+      BaseTupleComparator other = (BaseTupleComparator) obj;
+      if (sortKeyIds.length != other.sortKeyIds.length) {
+        return false;
+      }
+
+      for (int i = 0; i < sortKeyIds.length; i++) {
+        if (sortKeyIds[i] != other.sortKeyIds[i] ||
+            asc[i] != other.asc[i] ||
+            nullFirsts[i] != other.nullFirsts[i]) {
+          return false;
+        }
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public TupleComparatorProto getProto() {
+    TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
+    builder.setSchema(schema.getProto());
+    for (int i = 0; i < sortSpecs.length; i++) {
+      builder.addSortSpecs(sortSpecs[i].getProto());
+    }
+
+    TupleComparatorSpecProto.Builder sortSpecBuilder;
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
+      sortSpecBuilder.setColumnId(sortKeyIds[i]);
+      sortSpecBuilder.setAscending(asc[i]);
+      sortSpecBuilder.setNullFirst(nullFirsts[i]);
+      builder.addCompSpecs(sortSpecBuilder);
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+
+    String prefix = "";
+    for (int i = 0; i < sortKeyIds.length; i++) {
+      sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i])
+        .append(",Asc=").append(asc[i])
+        .append(",NullFirst=").append(nullFirsts[i]);
+      prefix = " ,";
+    }
+    return sb.toString();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
index 42b49a8..609a3df 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/BinarySerializerDeserializer.java
@@ -36,7 +36,7 @@ public class BinarySerializerDeserializer implements SerializerDeserializer {
       throws IOException {
     byte[] bytes;
     int length = 0;
-    if (datum == null || datum instanceof NullDatum) {
+    if (datum == null || datum.isNull()) {
       return 0;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
index e0f8a2e..8b7e2e0 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
@@ -23,7 +23,7 @@ package org.apache.tajo.storage;
 
 import com.google.common.base.Preconditions;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnsupportedException;
 
@@ -71,7 +71,12 @@ public class FrameTuple implements Tuple, Cloneable {
 
   @Override
   public boolean isNull(int fieldid) {
-    return get(fieldid) instanceof NullDatum;
+    return get(fieldid).isNull();
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return !isNull(fieldid);
   }
 
   @Override
@@ -177,6 +182,11 @@ public class FrameTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public IntervalDatum getInterval(int fieldId) {
+    return (IntervalDatum) get(fieldId);
+  }
+
+  @Override
   public char [] getUnicodeChars(int fieldId) {
     return get(fieldId).asUnicodeChars();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
index 167e4a8..bfbe478 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
@@ -20,6 +20,7 @@ package org.apache.tajo.storage;
 
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.IntervalDatum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnsupportedException;
@@ -68,7 +69,12 @@ public class LazyTuple implements Tuple, Cloneable {
 
   @Override
   public boolean isNull(int fieldid) {
-    return get(fieldid) instanceof NullDatum;
+    return get(fieldid).isNull();
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return !isNull(fieldid);
   }
 
   @Override
@@ -199,6 +205,11 @@ public class LazyTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public IntervalDatum getInterval(int fieldId) {
+    return (IntervalDatum) get(fieldId);
+  }
+
+  @Override
   public char[] getUnicodeChars(int fieldId) {
     return get(fieldId).asUnicodeChars();
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
deleted file mode 100644
index f19b61f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MemoryUtil.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.datum.*;
-import org.apache.tajo.util.ClassSize;
-
-public class MemoryUtil {
-
-  /** Overhead for an NullDatum */
-  public static final long NULL_DATUM;
-
-  /** Overhead for an BoolDatum */
-  public static final long BOOL_DATUM;
-
-  /** Overhead for an CharDatum */
-  public static final long CHAR_DATUM;
-
-  /** Overhead for an BitDatum */
-  public static final long BIT_DATUM;
-
-  /** Overhead for an Int2Datum */
-  public static final long INT2_DATUM;
-
-  /** Overhead for an Int4Datum */
-  public static final long INT4_DATUM;
-
-  /** Overhead for an Int8Datum */
-  public static final long INT8_DATUM;
-
-  /** Overhead for an Float4Datum */
-  public static final long FLOAT4_DATUM;
-
-  /** Overhead for an Float8Datum */
-  public static final long FLOAT8_DATUM;
-
-  /** Overhead for an TextDatum */
-  public static final long TEXT_DATUM;
-
-  /** Overhead for an BlobDatum */
-  public static final long BLOB_DATUM;
-
-  /** Overhead for an DateDatum */
-  public static final long DATE_DATUM;
-
-  /** Overhead for an TimeDatum */
-  public static final long TIME_DATUM;
-
-  /** Overhead for an TimestampDatum */
-  public static final long TIMESTAMP_DATUM;
-
-  static {
-    NULL_DATUM = ClassSize.estimateBase(NullDatum.class, false);
-
-    CHAR_DATUM = ClassSize.estimateBase(CharDatum.class, false);
-
-    BOOL_DATUM = ClassSize.estimateBase(BooleanDatum.class, false);
-
-    BIT_DATUM = ClassSize.estimateBase(BitDatum.class, false);
-
-    INT2_DATUM = ClassSize.estimateBase(Int2Datum.class, false);
-
-    INT4_DATUM = ClassSize.estimateBase(Int4Datum.class, false);
-
-    INT8_DATUM = ClassSize.estimateBase(Int8Datum.class, false);
-
-    FLOAT4_DATUM = ClassSize.estimateBase(Float4Datum.class, false);
-
-    FLOAT8_DATUM = ClassSize.estimateBase(Float8Datum.class, false);
-
-    TEXT_DATUM = ClassSize.estimateBase(TextDatum.class, false);
-
-    BLOB_DATUM = ClassSize.estimateBase(BlobDatum.class, false);
-
-    DATE_DATUM = ClassSize.estimateBase(DateDatum.class, false);
-
-    TIME_DATUM = ClassSize.estimateBase(TimeDatum.class, false);
-
-    TIMESTAMP_DATUM = ClassSize.estimateBase(TimestampDatum.class, false);
-  }
-
-  public static long calculateMemorySize(Tuple tuple) {
-    long total = ClassSize.OBJECT;
-    for (Datum datum : tuple.getValues()) {
-      switch (datum.type()) {
-
-      case NULL_TYPE:
-        total += NULL_DATUM;
-        break;
-
-      case BOOLEAN:
-        total += BOOL_DATUM;
-        break;
-
-      case BIT:
-        total += BIT_DATUM;
-        break;
-
-      case CHAR:
-        total += CHAR_DATUM + datum.size();
-        break;
-
-      case INT1:
-      case INT2:
-        total += INT2_DATUM;
-        break;
-
-      case INT4:
-        total += INT4_DATUM;
-        break;
-
-      case INT8:
-        total += INT8_DATUM;
-        break;
-
-      case FLOAT4:
-        total += FLOAT4_DATUM;
-        break;
-
-      case FLOAT8:
-        total += FLOAT4_DATUM;
-        break;
-
-      case TEXT:
-        total += TEXT_DATUM + datum.size();
-        break;
-
-      case DATE:
-        total += DATE_DATUM;
-        break;
-
-      case TIME:
-        total += TIME_DATUM;
-        break;
-
-      case TIMESTAMP:
-        total += TIMESTAMP_DATUM;
-        break;
-
-      default:
-        break;
-      }
-    }
-
-    return total;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index 1f57675..7f729e1 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -31,6 +31,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatumFactory;
+import org.apache.tajo.util.UnsafeUtil;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.BitArray;
 
@@ -42,6 +43,8 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 
 public class RawFile {
+  public static final String FILE_EXTENSION = "raw";
+
   private static final Log LOG = LogFactory.getLog(RawFile.class);
 
   public static class RawFileScanner extends FileScanner implements SeekableScanner {
@@ -380,7 +383,7 @@ public class RawFile {
         tableStats.setNumRows(recordCount);
       }
 
-      StorageUtil.closeBuffer(buffer);
+      UnsafeUtil.free(buffer);
       IOUtils.cleanup(LOG, channel, fis);
     }
 
@@ -722,7 +725,7 @@ public class RawFile {
         LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
       }
 
-      StorageUtil.closeBuffer(buffer);
+      UnsafeUtil.free(buffer);
       IOUtils.cleanup(LOG, channel, randomAccessFile);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 70044ca..24b6280 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -23,8 +23,10 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.exception.UnknownDataTypeException;
+import org.apache.tajo.tuple.offheap.RowWriter;
 import org.apache.tajo.util.BitArray;
 
 import java.nio.ByteBuffer;
@@ -177,7 +179,8 @@ public class RowStoreUtil {
       nullFlags = new BitArray(schema.size());
       headerSize = nullFlags.bytesLength();
     }
-    public byte [] toBytes(Tuple tuple) {
+
+    public byte[] toBytes(Tuple tuple) {
       nullFlags.clear();
       int size = estimateTupleDataSize(tuple);
       ByteBuffer bb = ByteBuffer.allocate(size + headerSize);
@@ -191,42 +194,64 @@ public class RowStoreUtil {
 
         col = schema.getColumn(i);
         switch (col.getDataType().getType()) {
-          case NULL_TYPE: nullFlags.set(i); break;
-          case BOOLEAN: bb.put(tuple.get(i).asByte()); break;
-          case BIT: bb.put(tuple.get(i).asByte()); break;
-          case CHAR: bb.put(tuple.get(i).asByte()); break;
-          case INT2: bb.putShort(tuple.get(i).asInt2()); break;
-          case INT4: bb.putInt(tuple.get(i).asInt4()); break;
-          case INT8: bb.putLong(tuple.get(i).asInt8()); break;
-          case FLOAT4: bb.putFloat(tuple.get(i).asFloat4()); break;
-          case FLOAT8: bb.putDouble(tuple.get(i).asFloat8()); break;
-          case TEXT:
-            byte [] _string = tuple.get(i).asByteArray();
-            bb.putInt(_string.length);
-            bb.put(_string);
-            break;
-          case DATE: bb.putInt(tuple.get(i).asInt4()); break;
-          case TIME:
-          case TIMESTAMP:
-            bb.putLong(tuple.get(i).asInt8());
-            break;
-          case INTERVAL:
-            IntervalDatum interval = (IntervalDatum) tuple.get(i);
-            bb.putInt(interval.getMonths());
-            bb.putLong(interval.getMilliSeconds());
-            break;
-          case BLOB:
-            byte [] bytes = tuple.get(i).asByteArray();
-            bb.putInt(bytes.length);
-            bb.put(bytes);
-            break;
-          case INET4:
-            byte [] ipBytes = tuple.get(i).asByteArray();
-            bb.put(ipBytes);
-            break;
-          case INET6: bb.put(tuple.get(i).asByteArray()); break;
-          default:
-            throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
+        case NULL_TYPE:
+          nullFlags.set(i);
+          break;
+        case BOOLEAN:
+          bb.put(tuple.get(i).asByte());
+          break;
+        case BIT:
+          bb.put(tuple.get(i).asByte());
+          break;
+        case CHAR:
+          bb.put(tuple.get(i).asByte());
+          break;
+        case INT2:
+          bb.putShort(tuple.get(i).asInt2());
+          break;
+        case INT4:
+          bb.putInt(tuple.get(i).asInt4());
+          break;
+        case INT8:
+          bb.putLong(tuple.get(i).asInt8());
+          break;
+        case FLOAT4:
+          bb.putFloat(tuple.get(i).asFloat4());
+          break;
+        case FLOAT8:
+          bb.putDouble(tuple.get(i).asFloat8());
+          break;
+        case TEXT:
+          byte[] _string = tuple.get(i).asByteArray();
+          bb.putInt(_string.length);
+          bb.put(_string);
+          break;
+        case DATE:
+          bb.putInt(tuple.get(i).asInt4());
+          break;
+        case TIME:
+        case TIMESTAMP:
+          bb.putLong(tuple.get(i).asInt8());
+          break;
+        case INTERVAL:
+          IntervalDatum interval = (IntervalDatum) tuple.get(i);
+          bb.putInt(interval.getMonths());
+          bb.putLong(interval.getMilliSeconds());
+          break;
+        case BLOB:
+          byte[] bytes = tuple.get(i).asByteArray();
+          bb.putInt(bytes.length);
+          bb.put(bytes);
+          break;
+        case INET4:
+          byte[] ipBytes = tuple.get(i).asByteArray();
+          bb.put(ipBytes);
+          break;
+        case INET6:
+          bb.put(tuple.get(i).asByteArray());
+          break;
+        default:
+          throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
         }
       }
 
@@ -237,7 +262,7 @@ public class RowStoreUtil {
 
       bb.position(finalPosition);
       bb.flip();
-      byte [] buf = new byte [bb.limit()];
+      byte[] buf = new byte[bb.limit()];
       bb.get(buf);
       return buf;
     }
@@ -254,24 +279,38 @@ public class RowStoreUtil {
 
         col = schema.getColumn(i);
         switch (col.getDataType().getType()) {
-          case BOOLEAN:
-          case BIT:
-          case CHAR: size += 1; break;
-          case INT2: size += 2; break;
-          case DATE:
-          case INT4:
-          case FLOAT4: size += 4; break;
-          case TIME:
-          case TIMESTAMP:
-          case INT8:
-          case FLOAT8: size += 8; break;
-          case INTERVAL: size += 12; break;
-          case TEXT:
-          case BLOB: size += (4 + tuple.get(i).asByteArray().length); break;
-          case INET4:
-          case INET6: size += tuple.get(i).asByteArray().length; break;
-          default:
-            throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
+        case BOOLEAN:
+        case BIT:
+        case CHAR:
+          size += 1;
+          break;
+        case INT2:
+          size += 2;
+          break;
+        case DATE:
+        case INT4:
+        case FLOAT4:
+          size += 4;
+          break;
+        case TIME:
+        case TIMESTAMP:
+        case INT8:
+        case FLOAT8:
+          size += 8;
+          break;
+        case INTERVAL:
+          size += 12;
+          break;
+        case TEXT:
+        case BLOB:
+          size += (4 + tuple.get(i).asByteArray().length);
+          break;
+        case INET4:
+        case INET6:
+          size += tuple.get(i).asByteArray().length;
+          break;
+        default:
+          throw new RuntimeException(new UnknownDataTypeException(col.getDataType().getType().name()));
         }
       }
 
@@ -284,4 +323,55 @@ public class RowStoreUtil {
       return schema;
     }
   }
+
+  public static void convert(Tuple tuple, RowWriter writer) {
+    writer.startRow();
+
+    for (int i = 0; i < writer.dataTypes().length; i++) {
+      if (tuple.isNull(i)) {
+        writer.skipField();
+        continue;
+      }
+      switch (writer.dataTypes()[i].getType()) {
+      case BOOLEAN:
+        writer.putBool(tuple.getBool(i));
+        break;
+      case INT1:
+      case INT2:
+        writer.putInt2(tuple.getInt2(i));
+        break;
+      case INT4:
+      case DATE:
+      case INET4:
+        writer.putInt4(tuple.getInt4(i));
+        break;
+      case INT8:
+      case TIMESTAMP:
+      case TIME:
+        writer.putInt8(tuple.getInt8(i));
+        break;
+      case FLOAT4:
+        writer.putFloat4(tuple.getFloat4(i));
+        break;
+      case FLOAT8:
+        writer.putFloat8(tuple.getFloat8(i));
+        break;
+      case TEXT:
+        writer.putText(tuple.getBytes(i));
+        break;
+      case INTERVAL:
+        writer.putInterval((IntervalDatum) tuple.getInterval(i));
+        break;
+      case PROTOBUF:
+        writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
+        break;
+      case NULL_TYPE:
+        writer.skipField();
+        break;
+      default:
+        throw new UnsupportedException("Unknown data type: " + writer.dataTypes()[i]);
+      }
+    }
+    writer.endRow();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 07fa16b..f35c9ee 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -30,13 +30,11 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.KeyValueSet;
 import parquet.hadoop.ParquetOutputFormat;
-import sun.nio.ch.DirectBuffer;
 
 import java.io.DataInput;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -193,16 +191,6 @@ public class StorageUtil extends StorageConstants {
     }
   }
 
-  public static void closeBuffer(ByteBuffer buffer) {
-    if (buffer != null) {
-      if (buffer.isDirect()) {
-        ((DirectBuffer) buffer).cleaner().clean();
-      } else {
-        buffer.clear();
-      }
-    }
-  }
-
   public static int readFully(InputStream is, byte[] buffer, int offset, int length)
       throws IOException {
     int nread = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
index a2c08de..64e62ba 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -26,7 +26,6 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
 
 /**
  * This class is not thread-safe.
@@ -69,6 +68,11 @@ public class TableStatistics {
     numRows++;
   }
 
+  public void incrementRows(long num) {
+    numRows += num;
+  }
+
+
   public long getNumRows() {
     return this.numRows;
   }
@@ -82,7 +86,7 @@ public class TableStatistics {
   }
 
   public void analyzeField(int idx, Datum datum) {
-    if (datum instanceof NullDatum) {
+    if (datum.isNull()) {
       numNulls[idx]++;
       return;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
index d2ccdc7..b42c1b5 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java
@@ -45,7 +45,7 @@ public class TextSerializerDeserializer implements SerializerDeserializer {
     int length = 0;
     TajoDataTypes.DataType dataType = col.getDataType();
 
-    if (datum == null || datum instanceof NullDatum) {
+    if (datum == null || datum.isNull()) {
       switch (dataType.getType()) {
         case CHAR:
         case TEXT:

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
index c183171..53e68c7 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Tuple.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.storage;
 
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.ProtobufDatum;
 
 public interface Tuple extends Cloneable {
   
@@ -28,6 +27,9 @@ public interface Tuple extends Cloneable {
 	public boolean contains(int fieldid);
 
   public boolean isNull(int fieldid);
+
+  @SuppressWarnings("unused")
+  public boolean isNotNull(int fieldid);
 	
 	public void clear();
 	
@@ -65,7 +67,9 @@ public interface Tuple extends Cloneable {
 	
 	public String getText(int fieldId);
 
-  public ProtobufDatum getProtobufDatum(int fieldId);
+  public Datum getProtobufDatum(int fieldId);
+
+  public Datum getInterval(int fieldId);
 
   public char [] getUnicodeChars(int fieldId);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
index 51388a4..720226b 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java
@@ -1,4 +1,4 @@
-/**
+/***
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,164 +18,8 @@
 
 package org.apache.tajo.storage;
 
-import com.google.common.base.Objects;
-import com.google.common.base.Preconditions;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.NullDatum;
-
 import java.util.Comparator;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.TupleComparatorSpecProto;
-import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
-
-/**
- * The Comparator class for Tuples
- * 
- * @see Tuple
- */
-public class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> {
-  private final int[] sortKeyIds;
-  private final boolean[] asc;
-  @SuppressWarnings("unused")
-  private final boolean[] nullFirsts;  
-
-  private Datum left;
-  private Datum right;
-  private int compVal;
-
-  /**
-   * @param schema The schema of input tuples
-   * @param sortKeys The description of sort keys
-   */
-  public TupleComparator(Schema schema, SortSpec[] sortKeys) {
-    Preconditions.checkArgument(sortKeys.length > 0, 
-        "At least one sort key must be specified.");
-
-    this.sortKeyIds = new int[sortKeys.length];
-    this.asc = new boolean[sortKeys.length];
-    this.nullFirsts = new boolean[sortKeys.length];
-    for (int i = 0; i < sortKeys.length; i++) {
-      if (sortKeys[i].getSortKey().hasQualifier()) {
-        this.sortKeyIds[i] = schema.getColumnId(sortKeys[i].getSortKey().getQualifiedName());
-      } else {
-        this.sortKeyIds[i] = schema.getColumnIdByName(sortKeys[i].getSortKey().getSimpleName());
-      }
-          
-      this.asc[i] = sortKeys[i].isAscending();
-      this.nullFirsts[i]= sortKeys[i].isNullFirst();
-    }
-  }
-
-  public TupleComparator(TupleComparatorProto proto) {
-    this.sortKeyIds = new int[proto.getCompSpecsCount()];
-    this.asc = new boolean[proto.getCompSpecsCount()];
-    this.nullFirsts = new boolean[proto.getCompSpecsCount()];
-
-    for (int i = 0; i < proto.getCompSpecsCount(); i++) {
-      TupleComparatorSpecProto sortSepcProto = proto.getCompSpecs(i);
-      sortKeyIds[i] = sortSepcProto.getColumnId();
-      asc[i] = sortSepcProto.getAscending();
-      nullFirsts[i] = sortSepcProto.getNullFirst();
-    }
-  }
-
-  public boolean isAscendingFirstKey() {
-    return this.asc[0];
-  }
-
-  @Override
-  public int compare(Tuple tuple1, Tuple tuple2) {
-    for (int i = 0; i < sortKeyIds.length; i++) {
-      left = tuple1.get(sortKeyIds[i]);
-      right = tuple2.get(sortKeyIds[i]);
-
-      if (left instanceof NullDatum || right instanceof NullDatum) {
-        if (!left.equals(right)) {
-          if (left instanceof NullDatum) {
-            compVal = 1;
-          } else if (right instanceof NullDatum) {
-            compVal = -1;
-          }
-          if (nullFirsts[i]) {
-            if (compVal != 0) {
-              compVal *= -1;
-            }
-          }
-        } else {
-          compVal = 0;
-        }
-      } else {
-        if (asc[i]) {
-          compVal = left.compareTo(right);
-        } else {
-          compVal = right.compareTo(left);
-        }
-      }
-
-      if (compVal < 0 || compVal > 0) {
-        return compVal;
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(sortKeyIds);
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof TupleComparator) {
-      TupleComparator other = (TupleComparator) obj;
-      if (sortKeyIds.length != other.sortKeyIds.length) {
-        return false;
-      }
-
-      for (int i = 0; i < sortKeyIds.length; i++) {
-        if (sortKeyIds[i] != other.sortKeyIds[i] ||
-            asc[i] != other.asc[i] ||
-            nullFirsts[i] != other.nullFirsts[i]) {
-          return false;
-        }
-      }
-
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  @Override
-  public TupleComparatorProto getProto() {
-    TupleComparatorProto.Builder builder = TupleComparatorProto.newBuilder();
-    TupleComparatorSpecProto.Builder sortSpecBuilder;
-
-    for (int i = 0; i < sortKeyIds.length; i++) {
-      sortSpecBuilder = TupleComparatorSpecProto.newBuilder();
-      sortSpecBuilder.setColumnId(sortKeyIds[i]);
-      sortSpecBuilder.setAscending(asc[i]);
-      sortSpecBuilder.setNullFirst(nullFirsts[i]);
-      builder.addCompSpecs(sortSpecBuilder);
-    }
-
-    return builder.build();
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-
-    String prefix = "";
-    for (int i = 0; i < sortKeyIds.length; i++) {
-      sb.append(prefix).append("SortKeyId=").append(sortKeyIds[i])
-        .append(",Asc=").append(asc[i])
-        .append(",NullFirst=").append(nullFirsts[i]);
-      prefix = " ,";
-    }
-    return sb.toString();
-  }
-}
\ No newline at end of file
+public abstract class TupleComparator implements Comparator<Tuple> {
+  public abstract int compare(Tuple o1, Tuple o2);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
index 6cc09d4..dba02f7 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java
@@ -30,10 +30,10 @@ import java.util.Comparator;
 public class TupleRange implements Comparable<TupleRange>, Cloneable {
   private Tuple start;
   private Tuple end;
-  private final TupleComparator comp;
+  private final BaseTupleComparator comp;
 
   public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) {
-    this.comp = new TupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs);
+    this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs);
     // if there is only one value, start == end
     this.start = start;
     this.end = end;

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
index 4fb35f9..0e2560c 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/VTuple.java
@@ -19,10 +19,7 @@
 package org.apache.tajo.storage;
 
 import com.google.gson.annotations.Expose;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.Inet4Datum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.*;
 import org.apache.tajo.exception.UnimplementedException;
 
 import java.net.InetAddress;
@@ -38,7 +35,6 @@ public class VTuple implements Tuple, Cloneable {
 
   public VTuple(Tuple tuple) {
     this.values = tuple.getValues().clone();
-    this.offset = ((VTuple)tuple).offset;
   }
 
   public VTuple(Datum [] datum) {
@@ -57,7 +53,12 @@ public class VTuple implements Tuple, Cloneable {
 
   @Override
   public boolean isNull(int fieldid) {
-    return values[fieldid] instanceof NullDatum;
+    return values[fieldid].isNull();
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return !isNull(fieldid);
   }
 
   @Override
@@ -179,6 +180,11 @@ public class VTuple implements Tuple, Cloneable {
   }
 
   @Override
+  public IntervalDatum getInterval(int fieldId) {
+    return (IntervalDatum) values[fieldId];
+  }
+
+  @Override
   public char[] getUnicodeChars(int fieldId) {
     return values[fieldId].asUnicodeChars();
   }
@@ -193,23 +199,7 @@ public class VTuple implements Tuple, Cloneable {
   }
 
   public String toString() {
-		boolean first = true;
-		StringBuilder str = new StringBuilder();
-		str.append("(");
-		for(int i=0; i < values.length; i++) {			
-			if(values[i] != null) {
-				if(first) {
-					first = false;
-				} else {
-					str.append(", ");
-				}
-				str.append(i)
-				.append("=>")
-				.append(values[i]);
-			}
-		}
-		str.append(")");
-		return str.toString();
+		return toDisplayString(getValues());
 	}
 
 	@Override
@@ -230,4 +220,24 @@ public class VTuple implements Tuple, Cloneable {
     }
     return false;
   }
+
+  public static String toDisplayString(Datum [] values) {
+    boolean first = true;
+    StringBuilder str = new StringBuilder();
+    str.append("(");
+    for(int i=0; i < values.length; i++) {
+      if(values[i] != null) {
+        if(first) {
+          first = false;
+        } else {
+          str.append(", ");
+        }
+        str.append(i)
+            .append("=>")
+            .append(values[i]);
+      }
+    }
+    str.append(")");
+    return str.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index 6af8da0..6aca8d7 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -18,30 +18,27 @@
 
 package org.apache.tajo.storage.avro;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
 import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.storage.FileAppender;
 import org.apache.tajo.storage.TableStatistics;
 import org.apache.tajo.storage.Tuple;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
 
 /**
  * FileAppender for writing to Avro files.
@@ -102,7 +99,7 @@ public class AvroAppender extends FileAppender {
   }
 
   private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
-    if (tuple.get(i) instanceof NullDatum) {
+    if (tuple.get(i).isNull()) {
       return null;
     }
     switch (avroType) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
index 74be7ff..7024bdc 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java
@@ -20,13 +20,13 @@ package org.apache.tajo.storage.index;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.BaseTupleComparator;
 
 import java.io.IOException;
 
 public interface IndexMethod {
   IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
-      TupleComparator comparator) throws IOException;
+      BaseTupleComparator comparator) throws IOException;
   IndexReader getIndexReader(final Path fileName, Schema keySchema,
-      TupleComparator comparator) throws IOException;
+      BaseTupleComparator comparator) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
index 5d43bd5..d24d474 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -27,12 +27,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
+import org.apache.tajo.storage.BaseTupleComparator;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.index.IndexMethod;
 import org.apache.tajo.storage.index.IndexWriter;
 import org.apache.tajo.storage.index.OrderIndexReader;
@@ -67,13 +67,13 @@ public class BSTIndex implements IndexMethod {
   
   @Override
   public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
-      TupleComparator comparator) throws IOException {
+      BaseTupleComparator comparator) throws IOException {
     return new BSTIndexWriter(fileName, level, keySchema, comparator);
   }
 
   @Override
   public BSTIndexReader getIndexReader(Path fileName, Schema keySchema,
-      TupleComparator comparator) throws IOException {
+      BaseTupleComparator comparator) throws IOException {
     return new BSTIndexReader(fileName, keySchema, comparator);
   }
 
@@ -89,7 +89,7 @@ public class BSTIndex implements IndexMethod {
     private Path fileName;
 
     private final Schema keySchema;
-    private final TupleComparator compartor;
+    private final BaseTupleComparator compartor;
     private final KeyOffsetCollector collector;
     private KeyOffsetCollector rootCollector;
 
@@ -108,7 +108,7 @@ public class BSTIndex implements IndexMethod {
      * @throws IOException
      */
     public BSTIndexWriter(final Path fileName, int level, Schema keySchema,
-        TupleComparator comparator) throws IOException {
+        BaseTupleComparator comparator) throws IOException {
       this.fileName = fileName;
       this.level = level;
       this.keySchema = keySchema;
@@ -141,7 +141,7 @@ public class BSTIndex implements IndexMethod {
       collector.put(key, offset);
     }
 
-    public TupleComparator getComparator() {
+    public BaseTupleComparator getComparator() {
       return this.compartor;
     }
 
@@ -253,7 +253,7 @@ public class BSTIndex implements IndexMethod {
     private class KeyOffsetCollector {
       private TreeMap<Tuple, LinkedList<Long>> map;
 
-      public KeyOffsetCollector(TupleComparator comparator) {
+      public KeyOffsetCollector(BaseTupleComparator comparator) {
         map = new TreeMap<Tuple, LinkedList<Long>>(comparator);
       }
 
@@ -283,7 +283,7 @@ public class BSTIndex implements IndexMethod {
   public class BSTIndexReader implements OrderIndexReader , Closeable{
     private Path fileName;
     private Schema keySchema;
-    private TupleComparator comparator;
+    private BaseTupleComparator comparator;
 
     private FileSystem fs;
     private FSDataInputStream indexIn;
@@ -312,7 +312,7 @@ public class BSTIndex implements IndexMethod {
      * @param comparator
      * @throws IOException
      */
-    public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
+    public BSTIndexReader(final Path fileName, Schema keySchema, BaseTupleComparator comparator) throws IOException {
       this.fileName = fileName;
       this.keySchema = keySchema;
       this.comparator = comparator;
@@ -327,7 +327,7 @@ public class BSTIndex implements IndexMethod {
       return this.keySchema;
     }
 
-    public TupleComparator getComparator() {
+    public BaseTupleComparator getComparator() {
       return this.comparator;
     }
 
@@ -350,7 +350,7 @@ public class BSTIndex implements IndexMethod {
 
       TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
       compProto.mergeFrom(compBytes);
-      this.comparator = new TupleComparator(compProto.build());
+      this.comparator = new BaseTupleComparator(compProto.build());
 
       // level
       this.level = indexIn.readInt();

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
new file mode 100644
index 0000000..e0c7c97
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rawfile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.SeekableScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlockReader;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+
+public class DirectRawFileScanner extends FileScanner implements SeekableScanner {
+  private static final Log LOG = LogFactory.getLog(DirectRawFileScanner.class);
+
+  private FileChannel channel;
+  private TajoDataTypes.DataType[] columnTypes;
+  private Path path;
+
+  private boolean eof = false;
+  private long fileSize;
+  private FileInputStream fis;
+  private long recordCount;
+
+  private ZeroCopyTuple unSafeTuple = new ZeroCopyTuple();
+  private OffHeapRowBlock tupleBuffer;
+  private OffHeapRowBlockReader reader = new OffHeapRowBlockReader(tupleBuffer);
+
+  public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+    super(conf, schema, meta, null);
+    this.path = path;
+  }
+
+  @SuppressWarnings("unused")
+  public DirectRawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+    this(conf, schema, meta, fragment.getPath());
+  }
+
+  public void init() throws IOException {
+    File file;
+    try {
+      if (path.toUri().getScheme() != null) {
+        file = new File(path.toUri());
+      } else {
+        file = new File(path.toString());
+      }
+    } catch (IllegalArgumentException iae) {
+      throw new IOException(iae);
+    }
+    fis = new FileInputStream(file);
+    channel = fis.getChannel();
+    fileSize = channel.size();
+
+    if (tableStats != null) {
+      tableStats.setNumBytes(fileSize);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RawFileScanner open:" + path + "," + channel.position() + ", size :" + channel.size());
+    }
+
+    columnTypes = new TajoDataTypes.DataType[schema.size()];
+    for (int i = 0; i < schema.size(); i++) {
+      columnTypes[i] = schema.getColumn(i).getDataType();
+    }
+
+    tupleBuffer = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
+    reader = new OffHeapRowBlockReader(tupleBuffer);
+
+    fetchNeeded = !next(tupleBuffer);
+
+    super.init();
+  }
+
+  @Override
+  public long getNextOffset() throws IOException {
+    return channel.position() - reader.remainForRead();
+  }
+
+  @Override
+  public void seek(long offset) throws IOException {
+    channel.position(offset);
+    fetchNeeded = true;
+  }
+
+  public boolean next(OffHeapRowBlock rowblock) throws IOException {
+    return rowblock.copyFromChannel(channel, tableStats);
+  }
+
+  private boolean fetchNeeded = true;
+
+  @Override
+  public Tuple next() throws IOException {
+    if(eof) {
+      return null;
+    }
+
+    while(true) {
+      if (fetchNeeded) {
+        if (!next(tupleBuffer)) {
+          return null;
+        }
+        reader.reset();
+      }
+
+      fetchNeeded = !reader.next(unSafeTuple);
+
+      if (!fetchNeeded) {
+        recordCount++;
+        return unSafeTuple;
+      }
+    }
+  }
+
+  @Override
+  public void reset() throws IOException {
+    // reload initial buffer
+    fetchNeeded = true;
+    channel.position(0);
+    eof = false;
+    reader.reset();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (tableStats != null) {
+      tableStats.setReadBytes(fileSize);
+      tableStats.setNumRows(recordCount);
+    }
+    tupleBuffer.release();
+    tupleBuffer = null;
+    reader = null;
+    IOUtils.cleanup(LOG, channel, fis);
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return false;
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  @Override
+  public boolean isSplittable(){
+    return false;
+  }
+
+  @Override
+  public float getProgress() {
+    try {
+      tableStats.setNumRows(recordCount);
+      long filePos = 0;
+      if (channel != null) {
+        filePos = channel.position();
+        tableStats.setReadBytes(filePos);
+      }
+
+      if(eof || channel == null) {
+        tableStats.setReadBytes(fileSize);
+        return 1.0f;
+      }
+
+      if (filePos == 0) {
+        return 0.0f;
+      } else {
+        return Math.min(1.0f, ((float)filePos / (float)fileSize));
+      }
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+      return 0.0f;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
new file mode 100644
index 0000000..1108163
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.rawfile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.storage.FileAppender;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.BaseTupleBuilder;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.UnSafeTuple;
+import org.apache.tajo.unit.StorageUnit;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+public class DirectRawFileWriter extends FileAppender {
+  public static final String FILE_EXTENSION = "draw";
+  private static final Log LOG = LogFactory.getLog(DirectRawFileWriter.class);
+
+  private FileChannel channel;
+  private RandomAccessFile randomAccessFile;
+  private TajoDataTypes.DataType[] columnTypes;
+  private long pos;
+
+  private TableStatistics stats;
+
+  private BaseTupleBuilder builder;
+
+  public DirectRawFileWriter(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException {
+    super(conf, schema, meta, path);
+  }
+
+  public void init() throws IOException {
+    File file;
+    try {
+      if (path.toUri().getScheme() != null) {
+        file = new File(path.toUri());
+      } else {
+        file = new File(path.toString());
+      }
+    } catch (IllegalArgumentException iae) {
+      throw new IOException(iae);
+    }
+
+    randomAccessFile = new RandomAccessFile(file, "rw");
+    channel = randomAccessFile.getChannel();
+    pos = 0;
+
+    columnTypes = new TajoDataTypes.DataType[schema.size()];
+    for (int i = 0; i < schema.size(); i++) {
+      columnTypes[i] = schema.getColumn(i).getDataType();
+    }
+
+    if (enabledStats) {
+      this.stats = new TableStatistics(this.schema);
+    }
+
+    builder = new BaseTupleBuilder(schema);
+
+    super.init();
+  }
+
+  @Override
+  public long getOffset() throws IOException {
+    return pos;
+  }
+
+  public void writeRowBlock(OffHeapRowBlock rowBlock) throws IOException {
+    channel.write(rowBlock.nioBuffer());
+    if (enabledStats) {
+      stats.incrementRows(rowBlock.rows());
+    }
+
+    pos = channel.position();
+  }
+
+  private ByteBuffer buffer;
+  private void ensureSize(int size) throws IOException {
+    if (buffer.remaining() < size) {
+
+      buffer.limit(buffer.position());
+      buffer.flip();
+      channel.write(buffer);
+
+      buffer.clear();
+    }
+  }
+
+  @Override
+  public void addTuple(Tuple t) throws IOException {
+    if (enabledStats) {
+      for (int i = 0; i < schema.size(); i++) {
+        stats.analyzeField(i, t.get(i));
+      }
+    }
+
+    if (buffer == null) {
+      buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB);
+    }
+
+    UnSafeTuple unSafeTuple;
+
+    if (!(t instanceof UnSafeTuple)) {
+      RowStoreUtil.convert(t, builder);
+      unSafeTuple = builder.buildToZeroCopyTuple();
+    } else {
+      unSafeTuple = (UnSafeTuple) t;
+    }
+
+    ByteBuffer bb = unSafeTuple.nioBuffer();
+    ensureSize(bb.limit());
+    buffer.put(bb);
+
+    pos = channel.position() + (buffer.limit() - buffer.remaining());
+
+    if (enabledStats) {
+      stats.incrementRow();
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (buffer != null) {
+      buffer.limit(buffer.position());
+      buffer.flip();
+      channel.write(buffer);
+      buffer.clear();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+    if (enabledStats) {
+      stats.setNumBytes(getOffset());
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RawFileAppender written: " + getOffset() + " bytes, path: " + path);
+    }
+
+    IOUtils.cleanup(LOG, channel, randomAccessFile);
+  }
+
+  @Override
+  public TableStats getStats() {
+    if (enabledStats) {
+      stats.setNumBytes(pos);
+      return stats.getTableStat();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
new file mode 100644
index 0000000..ec08250
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
@@ -0,0 +1,110 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.*;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
+  private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
+
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  // buffer
+  private ByteBuffer buffer;
+  private long address;
+
+  public BaseTupleBuilder(Schema schema) {
+    super(SchemaUtil.toDataTypes(schema));
+    buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
+    address = UnsafeUtil.getAddress(buffer);
+  }
+
+  @Override
+  public long address() {
+    return address;
+  }
+
+  public void ensureSize(int size) {
+    if (buffer.remaining() - size < 0) { // check the remain size
+      // enlarge new buffer and copy writing data
+      int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
+      ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
+      long newAddress = ((DirectBuffer)newByteBuf).address();
+      UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
+      LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+
+      // release existing buffer and replace variables
+      UnsafeUtil.free(buffer);
+      buffer = newByteBuf;
+      address = newAddress;
+    }
+  }
+
+  @Override
+  public int position() {
+    return 0;
+  }
+
+  @Override
+  public void forward(int length) {
+  }
+
+  @Override
+  public void endRow() {
+    super.endRow();
+    buffer.position(0).limit(offset());
+  }
+
+  @Override
+  public Tuple build() {
+    return buildToHeapTuple();
+  }
+
+  public HeapTuple buildToHeapTuple() {
+    byte [] bytes = new byte[buffer.limit()];
+    UNSAFE.copyMemory(null, address, bytes, Unsafe.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
+    return new HeapTuple(bytes, dataTypes());
+  }
+
+  public ZeroCopyTuple buildToZeroCopyTuple() {
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
+    return zcTuple;
+  }
+
+  public void release() {
+    UnsafeUtil.free(buffer);
+    buffer = null;
+    address = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
new file mode 100644
index 0000000..be734e1
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+
+public interface RowBlockReader<T extends Tuple> {
+
+  /**
+   * Return for each tuple
+   *
+   * @return True if tuple block is filled with tuples. Otherwise, It will return false.
+   */
+  public boolean next(T tuple);
+
+  public void reset();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
new file mode 100644
index 0000000..c43c018
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
@@ -0,0 +1,26 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.RowWriter;
+
+public interface TupleBuilder extends RowWriter {
+  public Tuple build();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
new file mode 100644
index 0000000..9662d5a
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.UnsafeUtil;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
+  private ByteBuffer bb;
+
+  public DirectBufTuple(int length, DataType[] types) {
+    bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
+    set(bb, 0, length, types);
+  }
+
+  @Override
+  public void release() {
+    UnsafeUtil.free(bb);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
new file mode 100644
index 0000000..a327123
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+/**
+ * Fixed size limit specification
+ */
+public class FixedSizeLimitSpec extends ResizableLimitSpec {
+  public FixedSizeLimitSpec(long size) {
+    super(size, size);
+  }
+
+  public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
+    super(size, size, allowedOverflowRatio);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
new file mode 100644
index 0000000..7c7d8a1
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
@@ -0,0 +1,269 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.sun.tools.javac.util.Convert;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class HeapTuple implements Tuple {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+  private static final long BASE_OFFSET = Unsafe.ARRAY_BYTE_BASE_OFFSET;
+
+  private final byte [] data;
+  private final DataType [] types;
+
+  public HeapTuple(final byte [] bytes, final DataType [] types) {
+    this.data = bytes;
+    this.types = types;
+  }
+
+  @Override
+  public int size() {
+    return data.length;
+  }
+
+  public ByteBuffer nioBuffer() {
+    return ByteBuffer.wrap(data);
+  }
+
+  private int getFieldOffset(int fieldId) {
+    return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+  }
+
+  private int checkNullAndGetOffset(int fieldId) {
+    int offset = getFieldOffset(fieldId);
+    if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
+      throw new RuntimeException("Invalid Field Access: " + fieldId);
+    }
+    return offset;
+  }
+
+  @Override
+  public boolean contains(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public void clear() {
+    // nothing to do
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+  }
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+  }
+
+  @Override
+  public Datum get(int fieldId) {
+    if (isNull(fieldId)) {
+      return NullDatum.get();
+    }
+
+    switch (types[fieldId].getType()) {
+    case BOOLEAN:
+      return DatumFactory.createBool(getBool(fieldId));
+    case INT1:
+    case INT2:
+      return DatumFactory.createInt2(getInt2(fieldId));
+    case INT4:
+      return DatumFactory.createInt4(getInt4(fieldId));
+    case INT8:
+      return DatumFactory.createInt8(getInt4(fieldId));
+    case FLOAT4:
+      return DatumFactory.createFloat4(getFloat4(fieldId));
+    case FLOAT8:
+      return DatumFactory.createFloat8(getFloat8(fieldId));
+    case TEXT:
+      return DatumFactory.createText(getText(fieldId));
+    case TIMESTAMP:
+      return DatumFactory.createTimestamp(getInt8(fieldId));
+    case DATE:
+      return DatumFactory.createDate(getInt4(fieldId));
+    case TIME:
+      return DatumFactory.createTime(getInt8(fieldId));
+    case INTERVAL:
+      return getInterval(fieldId);
+    case INET4:
+      return DatumFactory.createInet4(getInt4(fieldId));
+    case PROTOBUF:
+      return getProtobufDatum(fieldId);
+    default:
+      throw new UnsupportedException("Unknown type: " + types[fieldId]);
+    }
+  }
+
+  @Override
+  public void setOffset(long offset) {
+  }
+
+  @Override
+  public long getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) {
+    long pos = checkNullAndGetOffset(fieldId);
+    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len);
+    return bytes;
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    return new String(getBytes(fieldId));
+  }
+
+  public IntervalDatum getInterval(int fieldId) {
+    long pos = checkNullAndGetOffset(fieldId);
+    int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
+    pos += SizeOf.SIZE_OF_INT;
+    long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
+    return new IntervalDatum(months, millisecs);
+  }
+
+  @Override
+  public Datum getProtobufDatum(int fieldId) {
+    byte [] bytes = getBytes(fieldId);
+
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+    Message.Builder builder = factory.newBuilder();
+    try {
+      builder.mergeFrom(bytes);
+    } catch (InvalidProtocolBufferException e) {
+      return NullDatum.get();
+    }
+
+    return new ProtobufDatum(builder.build());
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    long pos = checkNullAndGetOffset(fieldId);
+    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UNSAFE.ARRAY_BYTE_BASE_OFFSET, len);
+    return Convert.utf2chars(bytes);
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    return this;
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum [] datums = new Datum[size()];
+    for (int i = 0; i < size(); i++) {
+      if (contains(i)) {
+        datums[i] = get(i);
+      } else {
+        datums[i] = NullDatum.get();
+      }
+    }
+    return datums;
+  }
+
+  @Override
+  public String toString() {
+    return VTuple.toDisplayString(getValues());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
new file mode 100644
index 0000000..2f8e349
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class OffHeapMemory implements Deallocatable {
+  private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
+
+  protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  protected ByteBuffer buffer;
+  protected int memorySize;
+  protected ResizableLimitSpec limitSpec;
+  protected long address;
+
+  @VisibleForTesting
+  protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
+    this.buffer = buffer;
+    this.address = ((DirectBuffer) buffer).address();
+    this.memorySize = buffer.limit();
+    this.limitSpec = limitSpec;
+  }
+
+  public OffHeapMemory(ResizableLimitSpec limitSpec) {
+    this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
+  }
+
+  public long address() {
+    return address;
+  }
+
+  public long size() {
+    return memorySize;
+  }
+
+  public void resize(int newSize) {
+    Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
+
+    if (newSize > limitSpec.limit()) {
+      throw new RuntimeException("Resize cannot exceed the size limit");
+    }
+
+    if (newSize < memorySize) {
+      LOG.warn("The size reduction is ignored.");
+    }
+
+    int newBlockSize = UnsafeUtil.alignedSize(newSize);
+    ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
+    long newAddress = ((DirectBuffer)newByteBuf).address();
+
+    UNSAFE.copyMemory(this.address, newAddress, memorySize);
+
+    UnsafeUtil.free(buffer);
+    this.memorySize = newSize;
+    this.buffer = newByteBuf;
+    this.address = newAddress;
+  }
+
+  public java.nio.Buffer nioBuffer() {
+    return (ByteBuffer) buffer.position(0).limit(memorySize);
+  }
+
+  @Override
+  public void release() {
+    UnsafeUtil.free(this.buffer);
+    this.buffer = null;
+    this.address = 0;
+    this.memorySize = 0;
+  }
+
+  public String toString() {
+    return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/e6e2a6b7/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
new file mode 100644
index 0000000..689efb7
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
@@ -0,0 +1,176 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.SizeOf;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
+  private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
+
+  public static final int NULL_FIELD_OFFSET = -1;
+
+  DataType [] dataTypes;
+
+  // Basic States
+  private int maxRowNum = Integer.MAX_VALUE; // optional
+  private int rowNum;
+  protected int position = 0;
+
+  private OffHeapRowBlockWriter builder;
+
+  private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
+    super(buffer, limitSpec);
+    initialize(schema);
+  }
+
+  public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
+    super(limitSpec);
+    initialize(schema);
+  }
+
+  private void initialize(Schema schema) {
+    dataTypes = SchemaUtil.toDataTypes(schema);
+
+    this.builder = new OffHeapRowBlockWriter(this);
+  }
+
+  @VisibleForTesting
+  public OffHeapRowBlock(Schema schema, int bytes) {
+    this(schema, new ResizableLimitSpec(bytes));
+  }
+
+  @VisibleForTesting
+  public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
+    this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
+  }
+
+  public void position(int pos) {
+    this.position = pos;
+  }
+
+  public void clear() {
+    this.position = 0;
+    this.rowNum = 0;
+
+    builder.clear();
+  }
+
+  @Override
+  public ByteBuffer nioBuffer() {
+    return (ByteBuffer) buffer.position(0).limit(position);
+  }
+
+  public int position() {
+    return position;
+  }
+
+  public long usedMem() {
+    return position;
+  }
+
+  /**
+   * Ensure that this buffer has enough remaining space to add the size.
+   * Creates and copies to a new buffer if necessary
+   *
+   * @param size Size to add
+   */
+  public void ensureSize(int size) {
+    if (remain() - size < 0) {
+      if (!limitSpec.canIncrease(memorySize)) {
+        throw new RuntimeException("Cannot increase RowBlock anymore.");
+      }
+
+      int newBlockSize = limitSpec.increasedSize(memorySize);
+      resize(newBlockSize);
+      LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+    }
+  }
+
+  public long remain() {
+    return memorySize - position - builder.offset();
+  }
+
+  public int maxRowNum() {
+    return maxRowNum;
+  }
+  public int rows() {
+    return rowNum;
+  }
+
+  public void setRows(int rowNum) {
+    this.rowNum = rowNum;
+  }
+
+  public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
+    if (channel.position() < channel.size()) {
+      clear();
+
+      buffer.clear();
+      channel.read(buffer);
+      memorySize = buffer.position();
+
+      while (position < memorySize) {
+        long recordPtr = address + position;
+
+        if (remain() < SizeOf.SIZE_OF_INT) {
+          channel.position(channel.position() - remain());
+          memorySize = (int) (memorySize - remain());
+          return true;
+        }
+
+        int recordSize = UNSAFE.getInt(recordPtr);
+
+        if (remain() < recordSize) {
+          channel.position(channel.position() - remain());
+          memorySize = (int) (memorySize - remain());
+          return true;
+        }
+
+        position += recordSize;
+        rowNum++;
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public RowWriter getWriter() {
+    return builder;
+  }
+
+  public OffHeapRowBlockReader getReader() {
+    return new OffHeapRowBlockReader(this);
+  }
+}