You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/09/03 14:45:33 UTC

[3/4] tajo git commit: TAJO-1738: Improve off-heap RowBlock.

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
new file mode 100644
index 0000000..3dc8c23
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTuple.java
@@ -0,0 +1,342 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.UnsafeUtil;
+import org.apache.tajo.util.datetime.TimeMeta;
+import sun.misc.Unsafe;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class UnSafeTuple extends ZeroCopyTuple {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  private long address;
+  private DataType[] types;
+
+  @Override
+  public void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types) {
+    Preconditions.checkArgument(memoryBlock.hasAddress());
+
+    this.address = memoryBlock.address();
+    this.types = types;
+    super.set(relativePos, length);
+  }
+
+  public void set(UnSafeTuple tuple) {
+    this.address = tuple.address;
+    this.types = tuple.types;
+    super.set(tuple.getRelativePos(), tuple.getLength());
+  }
+
+  @Override
+  public int size() {
+    return types.length;
+  }
+
+  @Override
+  public TajoDataTypes.Type type(int fieldId) {
+    return types[fieldId].getType();
+  }
+
+  @Override
+  public int size(int fieldId) {
+    return PlatformDependent.getInt(getFieldAddr(fieldId));
+  }
+
+  public void writeTo(ByteBuffer bb) {
+    if (bb.remaining() < getLength()) {
+      throw new IndexOutOfBoundsException("remaining length: " + bb.remaining()
+          + ", tuple length: " + getLength());
+    }
+
+    if (getLength() > 0) {
+      if (bb.isDirect()) {
+        PlatformDependent.copyMemory(address(), PlatformDependent.directBufferAddress(bb) + bb.position(), getLength());
+        bb.position(bb.position() + getLength());
+      } else {
+        PlatformDependent.copyMemory(address(), bb.array(), bb.arrayOffset() + bb.position(), getLength());
+        bb.position(bb.position() + getLength());
+      }
+    }
+  }
+
+  public long address() {
+    return address + getRelativePos();
+  }
+
+  public HeapTuple toHeapTuple() {
+    HeapTuple heapTuple = new HeapTuple();
+    byte [] bytes = new byte[getLength()];
+    PlatformDependent.copyMemory(address(), bytes, 0, getLength());
+    heapTuple.set(bytes, types);
+    return heapTuple;
+  }
+
+  private int getFieldOffset(int fieldId) {
+    return PlatformDependent.getInt(address()+ (long)(SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT)));
+  }
+
+  public long getFieldAddr(int fieldId) {
+    int fieldOffset = getFieldOffset(fieldId);
+    if (fieldOffset == -1) {
+      throw new RuntimeException("Invalid Field Access: " + fieldId);
+    }
+    return address() + fieldOffset;
+  }
+
+  @Override
+  public boolean contains(int fieldid) {
+    return getFieldOffset(fieldid) > MemoryRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isBlank(int fieldid) {
+    return getFieldOffset(fieldid) == MemoryRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isBlankOrNull(int fieldid) {
+    return getFieldOffset(fieldid) == MemoryRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public void clear() {
+    // nothing to do
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    throw new TajoRuntimeException(new UnsupportedException());
+  }
+
+  @Override
+  public Datum asDatum(int fieldId) {
+    if (isBlankOrNull(fieldId)) {
+      return NullDatum.get();
+    }
+
+    switch (types[fieldId].getType()) {
+    case BOOLEAN:
+      return DatumFactory.createBool(getBool(fieldId));
+    case BIT:
+      return DatumFactory.createBit(getByte(fieldId));
+    case INT1:
+    case INT2:
+      return DatumFactory.createInt2(getInt2(fieldId));
+    case INT4:
+      return DatumFactory.createInt4(getInt4(fieldId));
+    case INT8:
+      return DatumFactory.createInt8(getInt8(fieldId));
+    case FLOAT4:
+      return DatumFactory.createFloat4(getFloat4(fieldId));
+    case FLOAT8:
+      return DatumFactory.createFloat8(getFloat8(fieldId));
+    case CHAR:
+      return DatumFactory.createChar(getBytes(fieldId));
+    case TEXT:
+      return DatumFactory.createText(getBytes(fieldId));
+    case BLOB:
+      return DatumFactory.createBlob(getBytes(fieldId));
+    case TIMESTAMP:
+      return DatumFactory.createTimestamp(getInt8(fieldId));
+    case DATE:
+      return DatumFactory.createDate(getInt4(fieldId));
+    case TIME:
+      return DatumFactory.createTime(getInt8(fieldId));
+    case INTERVAL:
+      return getInterval(fieldId);
+    case INET4:
+      return DatumFactory.createInet4(getInt4(fieldId));
+    case PROTOBUF:
+      return getProtobufDatum(fieldId);
+    case NULL_TYPE:
+      return NullDatum.get();
+    default:
+      throw new TajoRuntimeException(new UnsupportedException("data type '" + types[fieldId] + "'"));
+    }
+  }
+
+  @Override
+  public void clearOffset() {
+  }
+
+  @Override
+  public void setOffset(long offset) {
+  }
+
+  @Override
+  public long getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return PlatformDependent.getByte(getFieldAddr(fieldId)) == 0x01;
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return PlatformDependent.getByte(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return UNSAFE.getChar(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = PlatformDependent.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    PlatformDependent.copyMemory(pos, bytes, 0, len);
+    return bytes;
+  }
+
+  @Override
+  public byte[] getTextBytes(int fieldId) {
+    return asDatum(fieldId).asTextBytes();
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    long addr = getFieldAddr(fieldId);
+    return PlatformDependent.getShort(addr);
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return PlatformDependent.getInt(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return PlatformDependent.getLong(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return Float.intBitsToFloat(PlatformDependent.getInt(getFieldAddr(fieldId)));
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return Double.longBitsToDouble(PlatformDependent.getLong(getFieldAddr(fieldId)));
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    return new String(getBytes(fieldId), TextDatum.DEFAULT_CHARSET);
+  }
+
+  @Override
+  public IntervalDatum getInterval(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int months = PlatformDependent.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+    long millisecs = PlatformDependent.getLong(pos);
+    return new IntervalDatum(months, millisecs);
+  }
+
+  @Override
+  public Datum getProtobufDatum(int fieldId) {
+    byte [] bytes = getBytes(fieldId);
+
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId]);
+    Message.Builder builder = factory.newBuilder();
+    try {
+      builder.mergeFrom(bytes);
+    } catch (InvalidProtocolBufferException e) {
+      return NullDatum.get();
+    }
+
+    return new ProtobufDatum(builder.build());
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = PlatformDependent.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    PlatformDependent.copyMemory(pos, bytes, 0, len);
+    return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
+  }
+
+  @Override
+  public TimeMeta getTimeDate(int fieldId) {
+    return asDatum(fieldId).asTimeMeta();
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    return toHeapTuple();
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum [] datums = new Datum[size()];
+    for (int i = 0; i < size(); i++) {
+      if (contains(i)) {
+        datums[i] = asDatum(i);
+      } else {
+        datums[i] = NullDatum.get();
+      }
+    }
+    return datums;
+  }
+
+  @Override
+  public String toString() {
+    return VTuple.toDisplayString(getValues());
+  }
+
+  public void release() {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java
new file mode 100644
index 0000000..53a78a8
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/UnSafeTupleBytesComparator.java
@@ -0,0 +1,99 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLongs;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteOrder;
+
+/**
+ * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
+ */
+public class UnSafeTupleBytesComparator {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  static final boolean littleEndian =
+      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+  public static int compare(long ptr1, long ptr2) {
+    int lstrLen = UNSAFE.getInt(ptr1);
+    int rstrLen = UNSAFE.getInt(ptr2);
+
+    ptr1 += SizeOf.SIZE_OF_INT;
+    ptr2 += SizeOf.SIZE_OF_INT;
+
+    int minLength = Math.min(lstrLen, rstrLen);
+    int minWords = minLength / Longs.BYTES;
+
+        /*
+         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+         * time is no slower than comparing 4 bytes at a time even on 32-bit.
+         * On the other hand, it is substantially faster on 64-bit.
+         */
+    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+      long lw = UNSAFE.getLong(ptr1);
+      long rw = UNSAFE.getLong(ptr2);
+      long diff = lw ^ rw;
+
+      if (diff != 0) {
+        if (!littleEndian) {
+          return UnsignedLongs.compare(lw, rw);
+        }
+
+        // Use binary search
+        int n = 0;
+        int y;
+        int x = (int) diff;
+        if (x == 0) {
+          x = (int) (diff >>> 32);
+          n = 32;
+        }
+
+        y = x << 16;
+        if (y == 0) {
+          n += 16;
+        } else {
+          x = y;
+        }
+
+        y = x << 8;
+        if (y == 0) {
+          n += 8;
+        }
+        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+      }
+
+      ptr1 += SizeOf.SIZE_OF_LONG;
+      ptr2 += SizeOf.SIZE_OF_LONG;
+    }
+
+    // The epilogue to cover the last (minLength % 8) elements.
+    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+      int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
+      if (result != 0) {
+        return result;
+      }
+    }
+    return lstrLen - rstrLen;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java
new file mode 100644
index 0000000..1f4f57e
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/tuple/memory/ZeroCopyTuple.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.storage.Tuple;
+
+public abstract class ZeroCopyTuple implements Tuple {
+
+  protected int relativePos;
+  protected int length;
+
+  public abstract void set(MemoryBlock memoryBlock, int relativePos, int length, DataType[] types);
+
+  void set(int relativePos, int length) {
+    this.relativePos = relativePos;
+    this.length = length;
+  }
+
+  public int getRelativePos() {
+    return relativePos;
+  }
+
+  public int getLength() {
+    return length;
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    return (Tuple) super.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
index ff6072e..575e628 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/UnsafeUtil.java
@@ -18,7 +18,7 @@
 package org.apache.tajo.util;
 
 import com.google.common.base.Preconditions;
-import sun.misc.Cleaner;
+import io.netty.util.internal.PlatformDependent;
 import sun.misc.Unsafe;
 import sun.nio.ch.DirectBuffer;
 
@@ -132,12 +132,6 @@ public class UnsafeUtil {
   }
 
   public static void free(ByteBuffer bb) {
-    Preconditions.checkNotNull(bb);
-    Preconditions.checkState(bb.isDirect());
-
-    Cleaner cleaner = ((DirectBuffer) bb).cleaner();
-    if (cleaner != null) {
-      cleaner.clean();
-    }
+    PlatformDependent.freeDirectBuffer(bb);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
new file mode 100644
index 0000000..6ce1a6f
--- /dev/null
+++ b/tajo-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.tuple.memory.*;
+import org.junit.Test;
+
+public class TestBaseTupleBuilder {
+
+  @Test
+  public void testBuild() {
+    BaseTupleBuilder builder = new BaseTupleBuilder(TestMemoryRowBlock.schema);
+
+    MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlock(10248);
+    RowBlockReader reader = rowBlock.getReader();
+
+    ZeroCopyTuple inputTuple = new UnSafeTuple();
+
+    HeapTuple heapTuple;
+    ZeroCopyTuple zcTuple;
+    int i = 0;
+    while(reader.next(inputTuple)) {
+      OffHeapRowBlockUtils.convert(inputTuple, builder);
+
+      zcTuple = builder.buildToZeroCopyTuple();
+      TestMemoryRowBlock.validateTupleResult(i, zcTuple);
+
+      heapTuple = builder.buildToHeapTuple();
+      TestMemoryRowBlock.validateTupleResult(i, heapTuple);
+
+      i++;
+    }
+    builder.release();
+    rowBlock.release();
+  }
+
+  @Test
+  public void testBuildWithNull() {
+    BaseTupleBuilder builder = new BaseTupleBuilder(TestMemoryRowBlock.schema);
+
+    MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlockWithNull(10248);
+    RowBlockReader reader = rowBlock.getReader();
+
+    ZeroCopyTuple inputTuple = new UnSafeTuple();
+
+    HeapTuple heapTuple;
+    ZeroCopyTuple zcTuple;
+    int i = 0;
+    while(reader.next(inputTuple)) {
+      OffHeapRowBlockUtils.convert(inputTuple, builder);
+
+      heapTuple = builder.buildToHeapTuple();
+      TestMemoryRowBlock.validateNullity(i, heapTuple);
+
+      zcTuple = builder.buildToZeroCopyTuple();
+      TestMemoryRowBlock.validateNullity(i, zcTuple);
+
+      i++;
+    }
+
+    builder.release();
+    rowBlock.release();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java
new file mode 100644
index 0000000..2b45428
--- /dev/null
+++ b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestHeapTuple.java
@@ -0,0 +1,82 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.tajo.storage.BufferPool;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestHeapTuple {
+
+  @Test
+  public void testHeapTupleFromOffheap() {
+    MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlock(1024);
+    assertTrue(rowBlock.getMemory().getBuffer().isDirect());
+    assertTrue(rowBlock.getMemory().hasAddress());
+
+    RowBlockReader reader = rowBlock.getReader();
+    assertEquals(OffHeapRowBlockReader.class, reader.getClass());
+
+    UnSafeTuple zcTuple = new UnSafeTuple();
+    int i = 0;
+    while (reader.next(zcTuple)) {
+
+      HeapTuple heapTuple = zcTuple.toHeapTuple();
+      TestMemoryRowBlock.validateTupleResult(i, heapTuple);
+      TestMemoryRowBlock.validateTupleResult(i, zcTuple);
+      TestMemoryRowBlock.validateTupleResult(i, zcTuple.toHeapTuple());
+      i++;
+    }
+
+    assertEquals(rowBlock.rows(), i);
+    rowBlock.release();
+  }
+
+  @Test
+  public void testHeapTupleFromHeap() throws CloneNotSupportedException {
+    MemoryRowBlock rowBlock = TestMemoryRowBlock.createRowBlock(1024);
+    int length = rowBlock.getMemory().writerPosition();
+    //write rows to heap
+    ByteBuf heapBuffer = BufferPool.heapBuffer(length, length);
+    heapBuffer.writeBytes(rowBlock.getMemory().getBuffer());
+    assertFalse(heapBuffer.isDirect());
+
+    ResizableMemoryBlock memoryBlock =
+        new ResizableMemoryBlock(heapBuffer);
+    assertFalse(memoryBlock.hasAddress());
+
+
+    RowBlockReader reader = new HeapRowBlockReader(memoryBlock, rowBlock.getDataTypes(), rowBlock.rows());
+    assertEquals(HeapRowBlockReader.class, reader.getClass());
+    HeapTuple heapTuple = new HeapTuple();
+    int i = 0;
+    while (reader.next(heapTuple)) {
+
+      TestMemoryRowBlock.validateTupleResult(i, heapTuple);
+      TestMemoryRowBlock.validateTupleResult(i, heapTuple.clone());
+      i++;
+    }
+    assertEquals(rowBlock.rows(), i);
+    rowBlock.release();
+    memoryBlock.release();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
new file mode 100644
index 0000000..a6003c7
--- /dev/null
+++ b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestMemoryRowBlock.java
@@ -0,0 +1,595 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.NumberUtil;
+import org.apache.tajo.util.ProtoUtil;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.apache.tajo.common.TajoDataTypes.Type;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestMemoryRowBlock {
+  private static final Log LOG = LogFactory.getLog(TestMemoryRowBlock.class);
+  public static String UNICODE_FIELD_PREFIX = "abc_가나다_";
+  public static DataType[] schema;
+
+  static {
+    schema = new DataType[] {
+        DataType.newBuilder().setType(Type.BOOLEAN).build(),
+        DataType.newBuilder().setType(Type.INT2).build(),
+        DataType.newBuilder().setType(Type.INT4).build(),
+        DataType.newBuilder().setType(Type.INT8).build(),
+        DataType.newBuilder().setType(Type.FLOAT4).build(),
+        DataType.newBuilder().setType(Type.FLOAT8).build(),
+        DataType.newBuilder().setType(Type.TEXT).build(),
+        DataType.newBuilder().setType(Type.TIMESTAMP).build(),
+        DataType.newBuilder().setType(Type.DATE).build(),
+        DataType.newBuilder().setType(Type.TIME).build(),
+        DataType.newBuilder().setType(Type.INTERVAL).build(),
+        DataType.newBuilder().setType(Type.INET4).build(),
+        DataType.newBuilder().setType(Type.PROTOBUF).setCode(PrimitiveProtos.StringProto.class.getName()).build()
+    };
+  }
+
+  private void explainRowBlockAllocation(MemoryRowBlock rowBlock, long startTime, long endTime) {
+    LOG.info(FileUtil.humanReadableByteCount(rowBlock.capacity(), true) + " bytes allocated "
+        + (endTime - startTime) + " msec");
+  }
+
+  @Test
+  public void testPutAndReadValidation() {
+    int rowNum = 1000;
+
+    long allocStart = System.currentTimeMillis();
+    MemoryRowBlock rowBlock = new MemoryRowBlock(schema, 1024);
+    long allocEnd = System.currentTimeMillis();
+    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+    RowBlockReader reader = null;
+
+    ZeroCopyTuple tuple = new UnSafeTuple();
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      fillRow(i, rowBlock.getWriter());
+
+      reader = rowBlock.getReader();
+      int j = 0;
+      while(reader.next(tuple)) {
+        validateTupleResult(j, tuple);
+        j++;
+      }
+    }
+
+    assertNotNull(reader);
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
+
+    long readStart = System.currentTimeMillis();
+    tuple = new UnSafeTuple();
+    int j = 0;
+    reader.reset();
+    while(reader.next(tuple)) {
+      validateTupleResult(j, tuple);
+      j++;
+    }
+    assertEquals(rowNum, j);
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+    rowBlock.release();
+  }
+
+  @Test
+  public void testNullityValidation() {
+    int rowNum = 1000;
+
+    long allocStart = System.currentTimeMillis();
+    MemoryRowBlock rowBlock = new MemoryRowBlock(schema, 1024);
+    long allocEnd = System.currentTimeMillis();
+    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+    RowBlockReader reader = null;
+    ZeroCopyTuple tuple = new UnSafeTuple();
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+
+      fillRowBlockWithNull(i, rowBlock.getWriter());
+
+      reader = rowBlock.getReader();
+      int j = 0;
+      while(reader.next(tuple)) {
+        validateNullity(j, tuple);
+
+        j++;
+      }
+    }
+
+    assertNotNull(reader);
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing and nullity validating take " + (writeEnd - writeStart) + " msec");
+
+    long readStart = System.currentTimeMillis();
+    tuple = new UnSafeTuple();
+    int j = 0;
+    reader.reset();
+    while(reader.next(tuple)) {
+      validateNullity(j, tuple);
+
+      j++;
+    }
+    assertEquals(rowNum, j);
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+    rowBlock.release();
+  }
+
+  @Test
+  public void testEmptyRow() {
+    int rowNum = 1000;
+
+    long allocStart = System.currentTimeMillis();
+    MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 10);
+    long allocEnd = System.currentTimeMillis();
+    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      rowBlock.getWriter().startRow();
+      // empty columns
+      rowBlock.getWriter().endRow();
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing tooks " + (writeEnd - writeStart) + " msec");
+
+    RowBlockReader reader = rowBlock.getReader();
+
+    long readStart = System.currentTimeMillis();
+    ZeroCopyTuple tuple = new UnSafeTuple();
+    int j = 0;
+    reader.reset();
+    while(reader.next(tuple)) {
+      j++;
+    }
+
+    assertEquals(rowNum, j);
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+    rowBlock.release();
+
+    assertEquals(rowNum, j);
+    assertEquals(rowNum, rowBlock.rows());
+  }
+
+  @Test
+  public void testSortBenchmark() {
+    int rowNum = 1000;
+
+    MemoryRowBlock rowBlock = createRowBlock(rowNum);
+    List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList();
+
+    long readStart = System.currentTimeMillis();
+    ZeroCopyTuple tuple = new UnSafeTuple();
+
+    RowBlockReader reader = rowBlock.getReader();
+    while(reader.next(tuple)) {
+      unSafeTuples.add(tuple);
+      tuple = new UnSafeTuple();
+    }
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+    long sortStart = System.currentTimeMillis();
+    Collections.sort(unSafeTuples, new Comparator<ZeroCopyTuple>() {
+      @Override
+      public int compare(ZeroCopyTuple t1, ZeroCopyTuple t2) {
+        return NumberUtil.compare(t1.getInt4(2), t2.getInt4(2));
+      }
+    });
+    long sortEnd = System.currentTimeMillis();
+    LOG.info("sorting took " + (sortEnd - sortStart) + " msec");
+    rowBlock.release();
+  }
+
+  @Test
+  public void testVTuplePutAndGetBenchmark() {
+    int rowNum = 1000;
+
+    List<VTuple> rowBlock = Lists.newArrayList();
+    long writeStart = System.currentTimeMillis();
+    VTuple tuple;
+    for (int i = 0; i < rowNum; i++) {
+      tuple = new VTuple(schema.length);
+      fillVTuple(i, tuple);
+      rowBlock.add(tuple);
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
+
+    long readStart = System.currentTimeMillis();
+    int j = 0;
+    for (VTuple t : rowBlock) {
+      validateTupleResult(j, t);
+      j++;
+    }
+
+    assertEquals(rowNum, j);
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+    int count = 0;
+    for (int l = 0; l < rowBlock.size(); l++) {
+      for(int m = 0; m < schema.length; m++ ) {
+        if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) {
+          count ++;
+        }
+      }
+    }
+    // For preventing unnecessary code elimination optimization.
+    LOG.info("The number of INT4 values is " + count + ".");
+  }
+
+  @Test
+  public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() {
+    int rowNum = 1000;
+
+    MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 100);
+
+    long writeStart = System.currentTimeMillis();
+    VTuple tuple = new VTuple(schema.length);
+    for (int i = 0; i < rowNum; i++) {
+      fillVTuple(i, tuple);
+      rowBlock.getWriter().addTuple(tuple);
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
+
+    validateResults(rowBlock);
+    rowBlock.release();
+  }
+
+  @Test
+  public void testSerDerOfRowBlock() {
+    int rowNum = 1000;
+
+    MemoryRowBlock rowBlock = createRowBlock(rowNum);
+
+    MemoryRowBlock restoredRowBlock = new MemoryRowBlock(rowBlock);
+    validateResults(restoredRowBlock);
+    rowBlock.release();
+  }
+
+  @Test
+  public void testSerDerOfZeroCopyTuple() {
+    int rowNum = 1000;
+
+    MemoryRowBlock rowBlock = createRowBlock(rowNum);
+
+    MemoryRowBlock restoredRowBlock = new MemoryRowBlock(rowBlock);
+    RowBlockReader reader = restoredRowBlock.getReader();
+
+    long readStart = System.currentTimeMillis();
+    UnSafeTuple tuple = new UnSafeTuple();
+
+    int j = 0;
+    List<ZeroCopyTuple> copyTuples = Lists.newArrayList();
+
+    while (reader.next(tuple)) {
+      validateTupleResult(j, tuple);
+
+      UnSafeTuple copyTuple = new UnSafeTuple();
+      copyTuple.set(tuple);
+      copyTuples.add(copyTuple);
+
+      j++;
+    }
+
+    assertEquals(rowNum, j);
+
+    for (int i = 0; i < j; i++) {
+      validateTupleResult(i, copyTuples.get(i));
+    }
+
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+    rowBlock.release();
+  }
+
+  public static MemoryRowBlock createRowBlock(int rowNum) {
+    long allocateStart = System.currentTimeMillis();
+    MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 8);
+    long allocatedEnd = System.currentTimeMillis();
+    LOG.info(FileUtil.humanReadableByteCount(rowBlock.capacity(), true) + " bytes allocated "
+        + (allocatedEnd - allocateStart) + " msec");
+
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      fillRow(i, rowBlock.getWriter());
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing takes " + (writeEnd - writeStart) + " msec");
+
+    return rowBlock;
+  }
+
+  public static MemoryRowBlock createRowBlockWithNull(int rowNum) {
+    long allocateStart = System.currentTimeMillis();
+    MemoryRowBlock rowBlock = new MemoryRowBlock(schema, StorageUnit.MB * 8);
+    long allocatedEnd = System.currentTimeMillis();
+    LOG.info(FileUtil.humanReadableByteCount(rowBlock.capacity(), true) + " bytes allocated "
+        + (allocatedEnd - allocateStart) + " msec");
+
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      fillRowBlockWithNull(i, rowBlock.getWriter());
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
+
+    return rowBlock;
+  }
+
+  public static void fillRow(int i, RowWriter builder) {
+    builder.startRow();
+    builder.putBool(i % 1 == 0 ? true : false); // 0
+    builder.putInt2((short) 1);                 // 1
+    builder.putInt4(i);                         // 2
+    builder.putInt8(i);                         // 3
+    builder.putFloat4(i);                       // 4
+    builder.putFloat8(i);                       // 5
+    builder.putText(UNICODE_FIELD_PREFIX + i);  // 6
+    builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
+    builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
+    builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
+    builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
+    builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
+    builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
+    builder.endRow();
+  }
+
+  public static void fillRowBlockWithNull(int i, RowWriter writer) {
+    writer.startRow();
+
+    if (i == 0) {
+      writer.skipField();
+    } else {
+      writer.putBool(i % 1 == 0 ? true : false); // 0
+    }
+    if (i % 1 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInt2((short) 1);                 // 1
+    }
+
+    if (i % 2 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInt4(i);                         // 2
+    }
+
+    if (i % 3 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInt8(i);                         // 3
+    }
+
+    if (i % 4 == 0) {
+      writer.skipField();
+    } else {
+      writer.putFloat4(i);                       // 4
+    }
+
+    if (i % 5 == 0) {
+      writer.skipField();
+    } else {
+      writer.putFloat8(i);                       // 5
+    }
+
+    if (i % 6 == 0) {
+      writer.skipField();
+    } else {
+      writer.putText(UNICODE_FIELD_PREFIX + i);  // 6
+    }
+
+    if (i % 7 == 0) {
+      writer.skipField();
+    } else {
+      writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
+    }
+
+    if (i % 8 == 0) {
+      writer.skipField();
+    } else {
+      writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
+    }
+
+    if (i % 9 == 0) {
+      writer.skipField();
+    } else {
+      writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
+    }
+
+    if (i % 10 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
+    }
+
+    if (i % 11 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
+    }
+
+    if (i % 12 == 0) {
+      writer.skipField();
+    } else {
+      writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
+    }
+
+    writer.endRow();
+  }
+
+  public static void fillVTuple(int i, VTuple tuple) {
+    tuple.put(0, DatumFactory.createBool(i % 1 == 0));
+    tuple.put(1, DatumFactory.createInt2((short) 1));
+    tuple.put(2, DatumFactory.createInt4(i));
+    tuple.put(3, DatumFactory.createInt8(i));
+    tuple.put(4, DatumFactory.createFloat4(i));
+    tuple.put(5, DatumFactory.createFloat8(i));
+    tuple.put(6, DatumFactory.createText(UNICODE_FIELD_PREFIX + i));
+    tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7
+    tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8
+    tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9
+    tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10
+    tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11
+    tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12;
+  }
+
+  public static void validateResults(MemoryRowBlock rowBlock) {
+    long readStart = System.currentTimeMillis();
+    ZeroCopyTuple tuple = new UnSafeTuple();
+    int j = 0;
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+    reader.reset();
+    while(reader.next(tuple)) {
+      validateTupleResult(j, tuple);
+      j++;
+    }
+    assertEquals(rowBlock.rows(), j);
+    long readEnd = System.currentTimeMillis();
+    LOG.info("Reading takes " + (readEnd - readStart) + " msec");
+  }
+
+  public static void validateTupleResult(int j, Tuple t) {
+    assertTrue((j % 1 == 0) == t.getBool(0));
+    assertTrue(1 == t.getInt2(1));
+    assertEquals(j, t.getInt4(2));
+    assertEquals(j, t.getInt8(3));
+    assertTrue(j == t.getFloat4(4));
+    assertTrue(j == t.getFloat8(5));
+    assertEquals(UNICODE_FIELD_PREFIX + j, t.getText(6));
+    assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7));
+    assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8));
+    assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9));
+    assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10));
+    assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11));
+    assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12));
+  }
+
+  public static void validateNullity(int j, Tuple tuple) {
+    if (j == 0) {
+      tuple.isBlankOrNull(0);
+    } else {
+      assertTrue((j % 1 == 0) == tuple.getBool(0));
+    }
+
+    if (j % 1 == 0) {
+      tuple.isBlankOrNull(1);
+    } else {
+      assertTrue(1 == tuple.getInt2(1));
+    }
+
+    if (j % 2 == 0) {
+      tuple.isBlankOrNull(2);
+    } else {
+      assertEquals(j, tuple.getInt4(2));
+    }
+
+    if (j % 3 == 0) {
+      tuple.isBlankOrNull(3);
+    } else {
+      assertEquals(j, tuple.getInt8(3));
+    }
+
+    if (j % 4 == 0) {
+      tuple.isBlankOrNull(4);
+    } else {
+      assertTrue(j == tuple.getFloat4(4));
+    }
+
+    if (j % 5 == 0) {
+      tuple.isBlankOrNull(5);
+    } else {
+      assertTrue(j == tuple.getFloat8(5));
+    }
+
+    if (j % 6 == 0) {
+      tuple.isBlankOrNull(6);
+    } else {
+      assertEquals(UNICODE_FIELD_PREFIX + j, tuple.getText(6));
+    }
+
+    if (j % 7 == 0) {
+      tuple.isBlankOrNull(7);
+    } else {
+      assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7));
+    }
+
+    if (j % 8 == 0) {
+      tuple.isBlankOrNull(8);
+    } else {
+      assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8));
+    }
+
+    if (j % 9 == 0) {
+      tuple.isBlankOrNull(9);
+    } else {
+      assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9));
+    }
+
+    if (j % 10 == 0) {
+      tuple.isBlankOrNull(10);
+    } else {
+      assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10));
+    }
+
+    if (j % 11 == 0) {
+      tuple.isBlankOrNull(11);
+    } else {
+      assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11));
+    }
+
+    if (j % 12 == 0) {
+      tuple.isBlankOrNull(12);
+    } else {
+      assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java
new file mode 100644
index 0000000..483dad5
--- /dev/null
+++ b/tajo-common/src/test/java/org/apache/tajo/tuple/memory/TestResizableSpec.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.memory;
+
+import org.apache.tajo.unit.StorageUnit;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestResizableSpec {
+
+  @Test
+  public void testResizableLimit() {
+    ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f);
+
+    long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
+
+    assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
+
+    assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB));
+
+    assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB));
+
+    assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1);
+
+    assertFalse(limit.canIncrease(limit.limit()));
+  }
+
+  @Test
+  public void testFixedLimit() {
+    FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f);
+
+    assertEquals(limit.limit(), 100 * StorageUnit.MB);
+
+    assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000));
+
+    assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB));
+
+    assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB));
+
+    assertFalse(limit.canIncrease(limit.limit()));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-core-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core-tests/pom.xml b/tajo-core-tests/pom.xml
index a23e420..8199f46 100644
--- a/tajo-core-tests/pom.xml
+++ b/tajo-core-tests/pom.xml
@@ -225,6 +225,10 @@
           <groupId>com.sun.jersey.jersey-test-framework</groupId>
           <artifactId>jersey-test-framework-grizzly2</artifactId>
         </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -280,12 +284,24 @@
       <version>${hbase.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-server</artifactId>
       <version>${hbase.version}</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hbase</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-jdbc/pom.xml b/tajo-jdbc/pom.xml
index 27cc471..09fabdd 100644
--- a/tajo-jdbc/pom.xml
+++ b/tajo-jdbc/pom.xml
@@ -147,6 +147,10 @@
           <groupId>com.sun.jersey.jersey-test-framework</groupId>
           <artifactId>jersey-test-framework-grizzly2</artifactId>
         </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/pom.xml b/tajo-storage/tajo-storage-common/pom.xml
index afbeead..4321a13 100644
--- a/tajo-storage/tajo-storage-common/pom.xml
+++ b/tajo-storage/tajo-storage-common/pom.xml
@@ -222,6 +222,10 @@ limitations under the License.
           <groupId>com.sun.jersey.jersey-test-framework</groupId>
           <artifactId>jersey-test-framework-grizzly2</artifactId>
         </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -273,6 +277,10 @@ limitations under the License.
           <artifactId>hadoop-mapreduce-client-core</artifactId>
           <groupId>org.apache.hadoop</groupId>
         </exclusion>
+        <exclusion>
+          <artifactId>netty-all</artifactId>
+          <groupId>io.netty</groupId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
deleted file mode 100644
index d611ee3..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/BufferPool.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.util.ResourceLeakDetector;
-import io.netty.util.internal.PlatformDependent;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.util.CommonTestingUtil;
-
-import java.lang.reflect.Field;
-
-/* this class is PooledBuffer holder */
-public class BufferPool {
-
-  public static final String ALLOW_CACHE = "tajo.storage.buffer.thread-local.cache";
-  private static final ByteBufAllocator ALLOCATOR;
-
-  private BufferPool() {
-  }
-
-  static {
-    /* TODO Enable thread cache
-    *  Create a pooled ByteBuf allocator but disables the thread-local cache.
-    *  Because the TaskRunner thread is newly created
-    * */
-
-    if (System.getProperty(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
-      /* Disable pooling buffers for memory usage  */
-      ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
-
-      /* if you are finding memory leak, please enable this line */
-      ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.ADVANCED);
-    } else {
-      TajoConf tajoConf = new TajoConf();
-      ALLOCATOR = createPooledByteBufAllocator(true, tajoConf.getBoolean(ALLOW_CACHE, false), 0);
-    }
-  }
-
-  /**
-   * borrowed from Spark
-   */
-  public static PooledByteBufAllocator createPooledByteBufAllocator(
-      boolean allowDirectBufs,
-      boolean allowCache,
-      int numCores) {
-    if (numCores == 0) {
-      numCores = Runtime.getRuntime().availableProcessors();
-    }
-    return new PooledByteBufAllocator(
-        allowDirectBufs && PlatformDependent.directBufferPreferred(),
-        Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
-        Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
-        getPrivateStaticField("DEFAULT_PAGE_SIZE"),
-        getPrivateStaticField("DEFAULT_MAX_ORDER"),
-        allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
-        allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
-        allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
-    );
-  }
-
-  /** Used to get defaults from Netty's private static fields. */
-  private static int getPrivateStaticField(String name) {
-    try {
-      Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
-      f.setAccessible(true);
-      return f.getInt(null);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static long maxDirectMemory() {
-    return PlatformDependent.maxDirectMemory();
-  }
-
-
-  public static ByteBuf directBuffer(int size) {
-    return ALLOCATOR.directBuffer(size);
-  }
-
-  /**
-   *
-   * @param size the initial capacity
-   * @param max the max capacity
-   * @return allocated ByteBuf from pool
-   */
-  public static ByteBuf directBuffer(int size, int max) {
-    return ALLOCATOR.directBuffer(size, max);
-  }
-
-  @InterfaceStability.Unstable
-  public static void forceRelease(ByteBuf buf) {
-    buf.release(buf.refCnt());
-  }
-
-  /**
-   * the ByteBuf will increase to writable size
-   * @param buf
-   * @param minWritableBytes required minimum writable size
-   */
-  public static void ensureWritable(ByteBuf buf, int minWritableBytes) {
-    buf.ensureWritable(minWritableBytes);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
index 7708d52..8ca55cc 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/RowStoreUtil.java
@@ -23,11 +23,9 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
 import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
-import org.apache.tajo.tuple.offheap.RowWriter;
 import org.apache.tajo.util.BitArray;
 
 import java.nio.ByteBuffer;
@@ -335,56 +333,4 @@ public class RowStoreUtil {
       return schema;
     }
   }
-
-  public static void convert(Tuple tuple, RowWriter writer) {
-    writer.startRow();
-
-    for (int i = 0; i < writer.dataTypes().length; i++) {
-      if (tuple.isBlankOrNull(i)) {
-        writer.skipField();
-        continue;
-      }
-      switch (writer.dataTypes()[i].getType()) {
-      case BOOLEAN:
-        writer.putBool(tuple.getBool(i));
-        break;
-      case INT1:
-      case INT2:
-        writer.putInt2(tuple.getInt2(i));
-        break;
-      case INT4:
-      case DATE:
-      case INET4:
-        writer.putInt4(tuple.getInt4(i));
-        break;
-      case INT8:
-      case TIMESTAMP:
-      case TIME:
-        writer.putInt8(tuple.getInt8(i));
-        break;
-      case FLOAT4:
-        writer.putFloat4(tuple.getFloat4(i));
-        break;
-      case FLOAT8:
-        writer.putFloat8(tuple.getFloat8(i));
-        break;
-      case TEXT:
-        writer.putText(tuple.getBytes(i));
-        break;
-      case INTERVAL:
-        writer.putInterval((IntervalDatum) tuple.getInterval(i));
-        break;
-      case PROTOBUF:
-        writer.putProtoDatum((ProtobufDatum) tuple.getProtobufDatum(i));
-        break;
-      case NULL_TYPE:
-        writer.skipField();
-        break;
-      default:
-        throw new TajoRuntimeException(
-            new UnsupportedException("unknown data type '" + writer.dataTypes()[i].getType().name() + "'"));
-      }
-    }
-    writer.endRow();
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
deleted file mode 100644
index c1835df..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.HeapTuple;
-import org.apache.tajo.tuple.offheap.OffHeapRowWriter;
-import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
-import org.apache.tajo.unit.StorageUnit;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
-  private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
-
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  // buffer
-  private ByteBuffer buffer;
-  private long address;
-
-  public BaseTupleBuilder(Schema schema) {
-    super(SchemaUtil.toDataTypes(schema));
-    buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
-    address = UnsafeUtil.getAddress(buffer);
-  }
-
-  @Override
-  public long address() {
-    return address;
-  }
-
-  public void ensureSize(int size) {
-    if (buffer.remaining() - size < 0) { // check the remain size
-      // enlarge new buffer and copy writing data
-      int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
-      ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
-      long newAddress = ((DirectBuffer)newByteBuf).address();
-      UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
-      LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
-
-      // release existing buffer and replace variables
-      UnsafeUtil.free(buffer);
-      buffer = newByteBuf;
-      address = newAddress;
-    }
-  }
-
-  @Override
-  public int position() {
-    return 0;
-  }
-
-  @Override
-  public void forward(int length) {
-  }
-
-  @Override
-  public void endRow() {
-    super.endRow();
-    buffer.position(0).limit(offset());
-  }
-
-  @Override
-  public Tuple build() {
-    return buildToHeapTuple();
-  }
-
-  public HeapTuple buildToHeapTuple() {
-    byte [] bytes = new byte[buffer.limit()];
-    UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
-    return new HeapTuple(bytes, dataTypes());
-  }
-
-  public ZeroCopyTuple buildToZeroCopyTuple() {
-    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
-    zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
-    return zcTuple;
-  }
-
-  public void release() {
-    UnsafeUtil.free(buffer);
-    buffer = null;
-    address = 0;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
deleted file mode 100644
index be734e1..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-
-public interface RowBlockReader<T extends Tuple> {
-
-  /**
-   * Return for each tuple
-   *
-   * @return True if tuple block is filled with tuples. Otherwise, It will return false.
-   */
-  public boolean next(T tuple);
-
-  public void reset();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
deleted file mode 100644
index c43c018..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple;
-
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.tuple.offheap.RowWriter;
-
-public interface TupleBuilder extends RowWriter {
-  public Tuple build();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
deleted file mode 100644
index 9662d5a..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.UnsafeUtil;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
-  private ByteBuffer bb;
-
-  public DirectBufTuple(int length, DataType[] types) {
-    bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
-    set(bb, 0, length, types);
-  }
-
-  @Override
-  public void release() {
-    UnsafeUtil.free(bb);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
deleted file mode 100644
index a327123..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-/**
- * Fixed size limit specification
- */
-public class FixedSizeLimitSpec extends ResizableLimitSpec {
-  public FixedSizeLimitSpec(long size) {
-    super(size, size);
-  }
-
-  public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
-    super(size, size, allowedOverflowRatio);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
deleted file mode 100644
index 9b69536..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.exception.TajoRuntimeException;
-import org.apache.tajo.exception.UnsupportedException;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.SizeOf;
-import org.apache.tajo.util.StringUtils;
-import org.apache.tajo.util.UnsafeUtil;
-import org.apache.tajo.util.datetime.TimeMeta;
-import sun.misc.Unsafe;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class HeapTuple implements Tuple {
-  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-  private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET;
-
-  private final byte [] data;
-  private final DataType [] types;
-
-  public HeapTuple(final byte [] bytes, final DataType [] types) {
-    this.data = bytes;
-    this.types = types;
-  }
-
-  @Override
-  public int size() {
-    return data.length;
-  }
-
-  @Override
-  public TajoDataTypes.Type type(int fieldId) {
-    return types[fieldId].getType();
-  }
-
-  @Override
-  public int size(int fieldId) {
-    return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public void clearOffset() {
-  }
-
-  public ByteBuffer nioBuffer() {
-    return ByteBuffer.wrap(data);
-  }
-
-  private int getFieldOffset(int fieldId) {
-    return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
-  }
-
-  private int checkNullAndGetOffset(int fieldId) {
-    int offset = getFieldOffset(fieldId);
-    if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
-      throw new RuntimeException("Invalid Field Access: " + fieldId);
-    }
-    return offset;
-  }
-
-  @Override
-  public boolean contains(int fieldid) {
-    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isBlank(int fieldid) {
-    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public boolean isBlankOrNull(int fieldid) {
-    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new TajoRuntimeException(new UnsupportedException());
-  }
-
-  @Override
-  public void clear() {
-    // nothing to do
-  }
-
-  @Override
-  public void put(int fieldId, Datum value) {
-    throw new TajoRuntimeException(new UnsupportedException());
-  }
-
-  @Override
-  public void put(Datum[] values) {
-    throw new TajoRuntimeException(new UnsupportedException());
-  }
-
-  @Override
-  public Datum asDatum(int fieldId) {
-    if (isBlankOrNull(fieldId)) {
-      return NullDatum.get();
-    }
-
-    switch (types[fieldId].getType()) {
-    case BOOLEAN:
-      return DatumFactory.createBool(getBool(fieldId));
-    case INT1:
-    case INT2:
-      return DatumFactory.createInt2(getInt2(fieldId));
-    case INT4:
-      return DatumFactory.createInt4(getInt4(fieldId));
-    case INT8:
-      return DatumFactory.createInt8(getInt4(fieldId));
-    case FLOAT4:
-      return DatumFactory.createFloat4(getFloat4(fieldId));
-    case FLOAT8:
-      return DatumFactory.createFloat8(getFloat8(fieldId));
-    case TEXT:
-      return DatumFactory.createText(getText(fieldId));
-    case TIMESTAMP:
-      return DatumFactory.createTimestamp(getInt8(fieldId));
-    case DATE:
-      return DatumFactory.createDate(getInt4(fieldId));
-    case TIME:
-      return DatumFactory.createTime(getInt8(fieldId));
-    case INTERVAL:
-      return getInterval(fieldId);
-    case INET4:
-      return DatumFactory.createInet4(getInt4(fieldId));
-    case PROTOBUF:
-      return getProtobufDatum(fieldId);
-    default:
-      throw new TajoRuntimeException(new UnsupportedException("data type '" + types[fieldId] + "'"));
-    }
-  }
-
-  @Override
-  public void setOffset(long offset) {
-  }
-
-  @Override
-  public long getOffset() {
-    return 0;
-  }
-
-  @Override
-  public boolean getBool(int fieldId) {
-    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
-  }
-
-  @Override
-  public byte getByte(int fieldId) {
-    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public char getChar(int fieldId) {
-    return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public byte[] getBytes(int fieldId) {
-    long pos = checkNullAndGetOffset(fieldId);
-    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return bytes;
-  }
-
-  @Override
-  public byte[] getTextBytes(int fieldId) {
-    return getText(fieldId).getBytes();
-  }
-
-  @Override
-  public short getInt2(int fieldId) {
-    return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public int getInt4(int fieldId) {
-    return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public long getInt8(int fieldId) {
-    return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public float getFloat4(int fieldId) {
-    return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public double getFloat8(int fieldId) {
-    return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
-  }
-
-  @Override
-  public String getText(int fieldId) {
-    return new String(getBytes(fieldId));
-  }
-
-  @Override
-  public TimeMeta getTimeDate(int fieldId) {
-    return asDatum(fieldId).asTimeMeta();
-  }
-
-  public IntervalDatum getInterval(int fieldId) {
-    long pos = checkNullAndGetOffset(fieldId);
-    int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
-    pos += SizeOf.SIZE_OF_INT;
-    long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
-    return new IntervalDatum(months, millisecs);
-  }
-
-  @Override
-  public Datum getProtobufDatum(int fieldId) {
-    byte [] bytes = getBytes(fieldId);
-
-    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
-    Message.Builder builder = factory.newBuilder();
-    try {
-      builder.mergeFrom(bytes);
-    } catch (InvalidProtocolBufferException e) {
-      return NullDatum.get();
-    }
-
-    return new ProtobufDatum(builder.build());
-  }
-
-  @Override
-  public char[] getUnicodeChars(int fieldId) {
-    long pos = checkNullAndGetOffset(fieldId);
-    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
-    pos += SizeOf.SIZE_OF_INT;
-
-    byte [] bytes = new byte[len];
-    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
-    return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    return this;
-  }
-
-  @Override
-  public Datum[] getValues() {
-    Datum [] datums = new Datum[size()];
-    for (int i = 0; i < size(); i++) {
-      if (contains(i)) {
-        datums[i] = asDatum(i);
-      } else {
-        datums[i] = NullDatum.get();
-      }
-    }
-    return datums;
-  }
-
-  @Override
-  public String toString() {
-    return VTuple.toDisplayString(getValues());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
deleted file mode 100644
index 2f8e349..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.UnsafeUtil;
-import sun.misc.Unsafe;
-import sun.nio.ch.DirectBuffer;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-public class OffHeapMemory implements Deallocatable {
-  private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
-
-  protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
-
-  protected ByteBuffer buffer;
-  protected int memorySize;
-  protected ResizableLimitSpec limitSpec;
-  protected long address;
-
-  @VisibleForTesting
-  protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
-    this.buffer = buffer;
-    this.address = ((DirectBuffer) buffer).address();
-    this.memorySize = buffer.limit();
-    this.limitSpec = limitSpec;
-  }
-
-  public OffHeapMemory(ResizableLimitSpec limitSpec) {
-    this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
-  }
-
-  public long address() {
-    return address;
-  }
-
-  public long size() {
-    return memorySize;
-  }
-
-  public void resize(int newSize) {
-    Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
-
-    if (newSize > limitSpec.limit()) {
-      throw new RuntimeException("Resize cannot exceed the size limit");
-    }
-
-    if (newSize < memorySize) {
-      LOG.warn("The size reduction is ignored.");
-    }
-
-    int newBlockSize = UnsafeUtil.alignedSize(newSize);
-    ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
-    long newAddress = ((DirectBuffer)newByteBuf).address();
-
-    UNSAFE.copyMemory(this.address, newAddress, memorySize);
-
-    UnsafeUtil.free(buffer);
-    this.memorySize = newSize;
-    this.buffer = newByteBuf;
-    this.address = newAddress;
-  }
-
-  public java.nio.Buffer nioBuffer() {
-    return (ByteBuffer) buffer.position(0).limit(memorySize);
-  }
-
-  @Override
-  public void release() {
-    UnsafeUtil.free(this.buffer);
-    this.buffer = null;
-    this.address = 0;
-    this.memorySize = 0;
-  }
-
-  public String toString() {
-    return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3a30f45c/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
deleted file mode 100644
index 90d4791..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/***
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.tuple.offheap;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SchemaUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.SeekableInputChannel;
-import org.apache.tajo.util.Deallocatable;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.SizeOf;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-
-import static org.apache.tajo.common.TajoDataTypes.DataType;
-
-public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
-  private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
-
-  public static final int NULL_FIELD_OFFSET = -1;
-
-  DataType [] dataTypes;
-
-  // Basic States
-  private int maxRowNum = Integer.MAX_VALUE; // optional
-  private int rowNum;
-  protected int position = 0;
-
-  private OffHeapRowBlockWriter builder;
-
-  private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
-    super(buffer, limitSpec);
-    initialize(schema);
-  }
-
-  public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
-    super(limitSpec);
-    initialize(schema);
-  }
-
-  private void initialize(Schema schema) {
-    dataTypes = SchemaUtil.toDataTypes(schema);
-
-    this.builder = new OffHeapRowBlockWriter(this);
-  }
-
-  @VisibleForTesting
-  public OffHeapRowBlock(Schema schema, int bytes) {
-    this(schema, new ResizableLimitSpec(bytes));
-  }
-
-  @VisibleForTesting
-  public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
-    this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
-  }
-
-  public void position(int pos) {
-    this.position = pos;
-  }
-
-  public void clear() {
-    this.position = 0;
-    this.rowNum = 0;
-
-    builder.clear();
-  }
-
-  @Override
-  public ByteBuffer nioBuffer() {
-    return (ByteBuffer) buffer.position(0).limit(position);
-  }
-
-  public int position() {
-    return position;
-  }
-
-  public long usedMem() {
-    return position;
-  }
-
-  /**
-   * Ensure that this buffer has enough remaining space to add the size.
-   * Creates and copies to a new buffer if necessary
-   *
-   * @param size Size to add
-   */
-  public void ensureSize(int size) {
-    if (remain() - size < 0) {
-      if (!limitSpec.canIncrease(memorySize)) {
-        throw new RuntimeException("Cannot increase RowBlock anymore.");
-      }
-
-      int newBlockSize = limitSpec.increasedSize(memorySize);
-      resize(newBlockSize);
-      LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
-    }
-  }
-
-  public long remain() {
-    return memorySize - position - builder.offset();
-  }
-
-  public int maxRowNum() {
-    return maxRowNum;
-  }
-  public int rows() {
-    return rowNum;
-  }
-
-  public void setRows(int rowNum) {
-    this.rowNum = rowNum;
-  }
-
-
-  public boolean copyFromChannel(SeekableInputChannel channel, TableStats stats) throws IOException {
-    if (channel.position() < channel.size()) {
-      clear();
-
-      buffer.clear();
-      channel.read(buffer);
-      memorySize = buffer.position();
-
-      while (position < memorySize) {
-        long recordPtr = address + position;
-
-        if (remain() < SizeOf.SIZE_OF_INT) {
-          channel.seek(channel.position() - remain());
-          memorySize = (int) (memorySize - remain());
-          return true;
-        }
-
-        int recordSize = UNSAFE.getInt(recordPtr);
-
-        if (remain() < recordSize) {
-          channel.seek(channel.position() - remain());
-          memorySize = (int) (memorySize - remain());
-          return true;
-        }
-
-        position += recordSize;
-        rowNum++;
-      }
-
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
-    if (channel.position() < channel.size()) {
-      clear();
-
-      buffer.clear();
-      channel.read(buffer);
-      memorySize = buffer.position();
-
-      while (position < memorySize) {
-        long recordPtr = address + position;
-
-        if (remain() < SizeOf.SIZE_OF_INT) {
-          channel.position(channel.position() - remain());
-          memorySize = (int) (memorySize - remain());
-          return true;
-        }
-
-        int recordSize = UNSAFE.getInt(recordPtr);
-
-        if (remain() < recordSize) {
-          channel.position(channel.position() - remain());
-          memorySize = (int) (memorySize - remain());
-          return true;
-        }
-
-        position += recordSize;
-        rowNum++;
-      }
-
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  public RowWriter getWriter() {
-    return builder;
-  }
-
-  public OffHeapRowBlockReader getReader() {
-    return new OffHeapRowBlockReader(this);
-  }
-}