You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:34 UTC

[20/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
new file mode 100644
index 0000000..c1835df
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/BaseTupleBuilder.java
@@ -0,0 +1,112 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.HeapTuple;
+import org.apache.tajo.tuple.offheap.OffHeapRowWriter;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class BaseTupleBuilder extends OffHeapRowWriter implements TupleBuilder, Deallocatable {
+  private static final Log LOG = LogFactory.getLog(BaseTupleBuilder.class);
+
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  // buffer
+  private ByteBuffer buffer;
+  private long address;
+
+  public BaseTupleBuilder(Schema schema) {
+    super(SchemaUtil.toDataTypes(schema));
+    buffer = ByteBuffer.allocateDirect(64 * StorageUnit.KB).order(ByteOrder.nativeOrder());
+    address = UnsafeUtil.getAddress(buffer);
+  }
+
+  @Override
+  public long address() {
+    return address;
+  }
+
+  public void ensureSize(int size) {
+    if (buffer.remaining() - size < 0) { // check the remain size
+      // enlarge new buffer and copy writing data
+      int newBlockSize = UnsafeUtil.alignedSize(buffer.capacity() * 2);
+      ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
+      long newAddress = ((DirectBuffer)newByteBuf).address();
+      UNSAFE.copyMemory(this.address, newAddress, buffer.limit());
+      LOG.debug("Increase the buffer size to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+
+      // release existing buffer and replace variables
+      UnsafeUtil.free(buffer);
+      buffer = newByteBuf;
+      address = newAddress;
+    }
+  }
+
+  @Override
+  public int position() {
+    return 0;
+  }
+
+  @Override
+  public void forward(int length) {
+  }
+
+  @Override
+  public void endRow() {
+    super.endRow();
+    buffer.position(0).limit(offset());
+  }
+
+  @Override
+  public Tuple build() {
+    return buildToHeapTuple();
+  }
+
+  public HeapTuple buildToHeapTuple() {
+    byte [] bytes = new byte[buffer.limit()];
+    UNSAFE.copyMemory(null, address, bytes, UnsafeUtil.ARRAY_BOOLEAN_BASE_OFFSET, buffer.limit());
+    return new HeapTuple(bytes, dataTypes());
+  }
+
+  public ZeroCopyTuple buildToZeroCopyTuple() {
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    zcTuple.set(buffer, 0, buffer.limit(), dataTypes());
+    return zcTuple;
+  }
+
+  public void release() {
+    UnsafeUtil.free(buffer);
+    buffer = null;
+    address = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
new file mode 100644
index 0000000..be734e1
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/RowBlockReader.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+
+public interface RowBlockReader<T extends Tuple> {
+
+  /**
+   * Return for each tuple
+   *
+   * @return True if tuple block is filled with tuples. Otherwise, It will return false.
+   */
+  public boolean next(T tuple);
+
+  public void reset();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
new file mode 100644
index 0000000..c43c018
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/TupleBuilder.java
@@ -0,0 +1,26 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.RowWriter;
+
+public interface TupleBuilder extends RowWriter {
+  public Tuple build();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
new file mode 100644
index 0000000..9662d5a
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/DirectBufTuple.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.UnsafeUtil;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class DirectBufTuple extends UnSafeTuple implements Deallocatable {
+  private ByteBuffer bb;
+
+  public DirectBufTuple(int length, DataType[] types) {
+    bb = ByteBuffer.allocateDirect(length).order(ByteOrder.nativeOrder());
+    set(bb, 0, length, types);
+  }
+
+  @Override
+  public void release() {
+    UnsafeUtil.free(bb);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
new file mode 100644
index 0000000..a327123
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/FixedSizeLimitSpec.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+/**
+ * Fixed size limit specification
+ */
+public class FixedSizeLimitSpec extends ResizableLimitSpec {
+  public FixedSizeLimitSpec(long size) {
+    super(size, size);
+  }
+
+  public FixedSizeLimitSpec(long size, float allowedOverflowRatio) {
+    super(size, size, allowedOverflowRatio);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
new file mode 100644
index 0000000..33f9f1c
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/HeapTuple.java
@@ -0,0 +1,272 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.UnsafeUtil;
+
+import sun.misc.Unsafe;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class HeapTuple implements Tuple {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+  private static final long BASE_OFFSET = UnsafeUtil.ARRAY_BYTE_BASE_OFFSET;
+
+  private final byte [] data;
+  private final DataType [] types;
+
+  public HeapTuple(final byte [] bytes, final DataType [] types) {
+    this.data = bytes;
+    this.types = types;
+  }
+
+  @Override
+  public int size() {
+    return data.length;
+  }
+
+  public ByteBuffer nioBuffer() {
+    return ByteBuffer.wrap(data);
+  }
+
+  private int getFieldOffset(int fieldId) {
+    return UNSAFE.getInt(data, BASE_OFFSET + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+  }
+
+  private int checkNullAndGetOffset(int fieldId) {
+    int offset = getFieldOffset(fieldId);
+    if (offset == OffHeapRowBlock.NULL_FIELD_OFFSET) {
+      throw new RuntimeException("Invalid Field Access: " + fieldId);
+    }
+    return offset;
+  }
+
+  @Override
+  public boolean contains(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public void clear() {
+    // nothing to do
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+  }
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+  }
+
+  @Override
+  public Datum get(int fieldId) {
+    if (isNull(fieldId)) {
+      return NullDatum.get();
+    }
+
+    switch (types[fieldId].getType()) {
+    case BOOLEAN:
+      return DatumFactory.createBool(getBool(fieldId));
+    case INT1:
+    case INT2:
+      return DatumFactory.createInt2(getInt2(fieldId));
+    case INT4:
+      return DatumFactory.createInt4(getInt4(fieldId));
+    case INT8:
+      return DatumFactory.createInt8(getInt4(fieldId));
+    case FLOAT4:
+      return DatumFactory.createFloat4(getFloat4(fieldId));
+    case FLOAT8:
+      return DatumFactory.createFloat8(getFloat8(fieldId));
+    case TEXT:
+      return DatumFactory.createText(getText(fieldId));
+    case TIMESTAMP:
+      return DatumFactory.createTimestamp(getInt8(fieldId));
+    case DATE:
+      return DatumFactory.createDate(getInt4(fieldId));
+    case TIME:
+      return DatumFactory.createTime(getInt8(fieldId));
+    case INTERVAL:
+      return getInterval(fieldId);
+    case INET4:
+      return DatumFactory.createInet4(getInt4(fieldId));
+    case PROTOBUF:
+      return getProtobufDatum(fieldId);
+    default:
+      throw new UnsupportedException("Unknown type: " + types[fieldId]);
+    }
+  }
+
+  @Override
+  public void setOffset(long offset) {
+  }
+
+  @Override
+  public long getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId)) == 0x01;
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return UNSAFE.getByte(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return UNSAFE.getChar(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) {
+    long pos = checkNullAndGetOffset(fieldId);
+    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return bytes;
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    return UNSAFE.getShort(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return UNSAFE.getInt(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return UNSAFE.getLong(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return UNSAFE.getFloat(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return UNSAFE.getDouble(data, BASE_OFFSET + checkNullAndGetOffset(fieldId));
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    return new String(getBytes(fieldId));
+  }
+
+  public IntervalDatum getInterval(int fieldId) {
+    long pos = checkNullAndGetOffset(fieldId);
+    int months = UNSAFE.getInt(data, BASE_OFFSET + pos);
+    pos += SizeOf.SIZE_OF_INT;
+    long millisecs = UNSAFE.getLong(data, BASE_OFFSET + pos);
+    return new IntervalDatum(months, millisecs);
+  }
+
+  @Override
+  public Datum getProtobufDatum(int fieldId) {
+    byte [] bytes = getBytes(fieldId);
+
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+    Message.Builder builder = factory.newBuilder();
+    try {
+      builder.mergeFrom(bytes);
+    } catch (InvalidProtocolBufferException e) {
+      return NullDatum.get();
+    }
+
+    return new ProtobufDatum(builder.build());
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    long pos = checkNullAndGetOffset(fieldId);
+    int len = UNSAFE.getInt(data, BASE_OFFSET + pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(data, BASE_OFFSET + pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    return this;
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum [] datums = new Datum[size()];
+    for (int i = 0; i < size(); i++) {
+      if (contains(i)) {
+        datums[i] = get(i);
+      } else {
+        datums[i] = NullDatum.get();
+      }
+    }
+    return datums;
+  }
+
+  @Override
+  public String toString() {
+    return VTuple.toDisplayString(getValues());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
new file mode 100644
index 0000000..2f8e349
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapMemory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class OffHeapMemory implements Deallocatable {
+  private static final Log LOG = LogFactory.getLog(OffHeapMemory.class);
+
+  protected static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  protected ByteBuffer buffer;
+  protected int memorySize;
+  protected ResizableLimitSpec limitSpec;
+  protected long address;
+
+  @VisibleForTesting
+  protected OffHeapMemory(ByteBuffer buffer, ResizableLimitSpec limitSpec) {
+    this.buffer = buffer;
+    this.address = ((DirectBuffer) buffer).address();
+    this.memorySize = buffer.limit();
+    this.limitSpec = limitSpec;
+  }
+
+  public OffHeapMemory(ResizableLimitSpec limitSpec) {
+    this(ByteBuffer.allocateDirect((int) limitSpec.initialSize()).order(ByteOrder.nativeOrder()), limitSpec);
+  }
+
+  public long address() {
+    return address;
+  }
+
+  public long size() {
+    return memorySize;
+  }
+
+  public void resize(int newSize) {
+    Preconditions.checkArgument(newSize > 0, "Size must be greater than 0 bytes");
+
+    if (newSize > limitSpec.limit()) {
+      throw new RuntimeException("Resize cannot exceed the size limit");
+    }
+
+    if (newSize < memorySize) {
+      LOG.warn("The size reduction is ignored.");
+    }
+
+    int newBlockSize = UnsafeUtil.alignedSize(newSize);
+    ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newBlockSize);
+    long newAddress = ((DirectBuffer)newByteBuf).address();
+
+    UNSAFE.copyMemory(this.address, newAddress, memorySize);
+
+    UnsafeUtil.free(buffer);
+    this.memorySize = newSize;
+    this.buffer = newByteBuf;
+    this.address = newAddress;
+  }
+
+  public java.nio.Buffer nioBuffer() {
+    return (ByteBuffer) buffer.position(0).limit(memorySize);
+  }
+
+  @Override
+  public void release() {
+    UnsafeUtil.free(this.buffer);
+    this.buffer = null;
+    this.address = 0;
+    this.memorySize = 0;
+  }
+
+  public String toString() {
+    return "memory=" + FileUtil.humanReadableByteCount(memorySize, false) + "," + limitSpec;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
new file mode 100644
index 0000000..689efb7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
@@ -0,0 +1,176 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.Deallocatable;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.SizeOf;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
+  private static final Log LOG = LogFactory.getLog(OffHeapRowBlock.class);
+
+  public static final int NULL_FIELD_OFFSET = -1;
+
+  DataType [] dataTypes;
+
+  // Basic States
+  private int maxRowNum = Integer.MAX_VALUE; // optional
+  private int rowNum;
+  protected int position = 0;
+
+  private OffHeapRowBlockWriter builder;
+
+  private OffHeapRowBlock(ByteBuffer buffer, Schema schema, ResizableLimitSpec limitSpec) {
+    super(buffer, limitSpec);
+    initialize(schema);
+  }
+
+  public OffHeapRowBlock(Schema schema, ResizableLimitSpec limitSpec) {
+    super(limitSpec);
+    initialize(schema);
+  }
+
+  private void initialize(Schema schema) {
+    dataTypes = SchemaUtil.toDataTypes(schema);
+
+    this.builder = new OffHeapRowBlockWriter(this);
+  }
+
+  @VisibleForTesting
+  public OffHeapRowBlock(Schema schema, int bytes) {
+    this(schema, new ResizableLimitSpec(bytes));
+  }
+
+  @VisibleForTesting
+  public OffHeapRowBlock(Schema schema, ByteBuffer buffer) {
+    this(buffer, schema, ResizableLimitSpec.DEFAULT_LIMIT);
+  }
+
+  public void position(int pos) {
+    this.position = pos;
+  }
+
+  public void clear() {
+    this.position = 0;
+    this.rowNum = 0;
+
+    builder.clear();
+  }
+
+  @Override
+  public ByteBuffer nioBuffer() {
+    return (ByteBuffer) buffer.position(0).limit(position);
+  }
+
+  public int position() {
+    return position;
+  }
+
+  public long usedMem() {
+    return position;
+  }
+
+  /**
+   * Ensure that this buffer has enough remaining space to add the size.
+   * Creates and copies to a new buffer if necessary
+   *
+   * @param size Size to add
+   */
+  public void ensureSize(int size) {
+    if (remain() - size < 0) {
+      if (!limitSpec.canIncrease(memorySize)) {
+        throw new RuntimeException("Cannot increase RowBlock anymore.");
+      }
+
+      int newBlockSize = limitSpec.increasedSize(memorySize);
+      resize(newBlockSize);
+      LOG.info("Increase DirectRowBlock to " + FileUtil.humanReadableByteCount(newBlockSize, false));
+    }
+  }
+
+  public long remain() {
+    return memorySize - position - builder.offset();
+  }
+
+  public int maxRowNum() {
+    return maxRowNum;
+  }
+  public int rows() {
+    return rowNum;
+  }
+
+  public void setRows(int rowNum) {
+    this.rowNum = rowNum;
+  }
+
+  public boolean copyFromChannel(FileChannel channel, TableStats stats) throws IOException {
+    if (channel.position() < channel.size()) {
+      clear();
+
+      buffer.clear();
+      channel.read(buffer);
+      memorySize = buffer.position();
+
+      while (position < memorySize) {
+        long recordPtr = address + position;
+
+        if (remain() < SizeOf.SIZE_OF_INT) {
+          channel.position(channel.position() - remain());
+          memorySize = (int) (memorySize - remain());
+          return true;
+        }
+
+        int recordSize = UNSAFE.getInt(recordPtr);
+
+        if (remain() < recordSize) {
+          channel.position(channel.position() - remain());
+          memorySize = (int) (memorySize - remain());
+          return true;
+        }
+
+        position += recordSize;
+        rowNum++;
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public RowWriter getWriter() {
+    return builder;
+  }
+
+  public OffHeapRowBlockReader getReader() {
+    return new OffHeapRowBlockReader(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
new file mode 100644
index 0000000..4a9313f
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockReader.java
@@ -0,0 +1,63 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+public class OffHeapRowBlockReader implements RowBlockReader<ZeroCopyTuple> {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+  OffHeapRowBlock rowBlock;
+
+  // Read States
+  private int curRowIdxForRead;
+  private int curPosForRead;
+
+  public OffHeapRowBlockReader(OffHeapRowBlock rowBlock) {
+    this.rowBlock = rowBlock;
+  }
+
+  public long remainForRead() {
+    return rowBlock.memorySize - curPosForRead;
+  }
+
+  @Override
+  public boolean next(ZeroCopyTuple tuple) {
+    if (curRowIdxForRead < rowBlock.rows()) {
+
+      long recordStartPtr = rowBlock.address() + curPosForRead;
+      int recordLen = UNSAFE.getInt(recordStartPtr);
+      tuple.set(rowBlock.buffer, curPosForRead, recordLen, rowBlock.dataTypes);
+
+      curPosForRead += recordLen;
+      curRowIdxForRead++;
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void reset() {
+    curPosForRead = 0;
+    curRowIdxForRead = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
new file mode 100644
index 0000000..dbc3188
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockUtils.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.storage.Tuple;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+public class OffHeapRowBlockUtils {
+
+  public static List<Tuple> sort(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
+    List<Tuple> tupleList = Lists.newArrayList();
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+    while(reader.next(zcTuple)) {
+      tupleList.add(zcTuple);
+      zcTuple = new ZeroCopyTuple();
+    }
+    Collections.sort(tupleList, comparator);
+    return tupleList;
+  }
+
+  public static Tuple[] sortToArray(OffHeapRowBlock rowBlock, Comparator<Tuple> comparator) {
+    Tuple[] tuples = new Tuple[rowBlock.rows()];
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+    for (int i = 0; i < rowBlock.rows() && reader.next(zcTuple); i++) {
+      tuples[i] = zcTuple;
+      zcTuple = new ZeroCopyTuple();
+    }
+    Arrays.sort(tuples, comparator);
+    return tuples;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
new file mode 100644
index 0000000..d177e0c
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+
+public class OffHeapRowBlockWriter extends OffHeapRowWriter {
+  OffHeapRowBlock rowBlock;
+
+  OffHeapRowBlockWriter(OffHeapRowBlock rowBlock) {
+    super(rowBlock.dataTypes);
+    this.rowBlock = rowBlock;
+  }
+
+  public long address() {
+    return rowBlock.address();
+  }
+
+  public int position() {
+    return rowBlock.position();
+  }
+
+  @Override
+  public void forward(int length) {
+    rowBlock.position(position() + length);
+  }
+
+  public void ensureSize(int size) {
+    rowBlock.ensureSize(size);
+  }
+
+  @Override
+  public void endRow() {
+    super.endRow();
+    rowBlock.setRows(rowBlock.rows() + 1);
+  }
+
+  @Override
+  public TajoDataTypes.DataType[] dataTypes() {
+    return rowBlock.dataTypes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
new file mode 100644
index 0000000..85c7e0b
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowWriter.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+
+/**
+ *
+ * Row Record Structure
+ *
+ * | row length (4 bytes) | field 1 offset | field 2 offset | ... | field N offset| field 1 | field 2| ... | field N |
+ *                              4 bytes          4 bytes               4 bytes
+ *
+ */
+public abstract class OffHeapRowWriter implements RowWriter {
+  /** record size + offset list */
+  private final int headerSize;
+  /** field offsets */
+  private final int [] fieldOffsets;
+  private final TajoDataTypes.DataType [] dataTypes;
+
+  private int curFieldIdx;
+  private int curOffset;
+
+  public OffHeapRowWriter(final TajoDataTypes.DataType [] dataTypes) {
+    this.dataTypes = dataTypes;
+    fieldOffsets = new int[dataTypes.length];
+    headerSize = SizeOf.SIZE_OF_INT * (dataTypes.length + 1);
+  }
+
+  public void clear() {
+    curOffset = 0;
+    curFieldIdx = 0;
+  }
+
+  public long recordStartAddr() {
+    return address() + position();
+  }
+
+  public abstract long address();
+
+  public abstract void ensureSize(int size);
+
+  public int offset() {
+    return curOffset;
+  }
+
+  /**
+   * Current position
+   *
+   * @return The position
+   */
+  public abstract int position();
+
+  /**
+   * Forward the address;
+   *
+   * @param length Length to be forwarded
+   */
+  public abstract void forward(int length);
+
+  @Override
+  public TajoDataTypes.DataType[] dataTypes() {
+    return dataTypes;
+  }
+
+  public boolean startRow() {
+    curOffset = headerSize;
+    curFieldIdx = 0;
+    return true;
+  }
+
+  public void endRow() {
+    long rowHeaderPos = address() + position();
+    OffHeapMemory.UNSAFE.putInt(rowHeaderPos, curOffset);
+    rowHeaderPos += SizeOf.SIZE_OF_INT;
+
+    for (int i = 0; i < curFieldIdx; i++) {
+      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, fieldOffsets[i]);
+      rowHeaderPos += SizeOf.SIZE_OF_INT;
+    }
+    for (int i = curFieldIdx; i < dataTypes.length; i++) {
+      OffHeapMemory.UNSAFE.putInt(rowHeaderPos, OffHeapRowBlock.NULL_FIELD_OFFSET);
+      rowHeaderPos += SizeOf.SIZE_OF_INT;
+    }
+
+    // rowOffset is equivalent to a byte length of this row.
+    forward(curOffset);
+  }
+
+  public void skipField() {
+    fieldOffsets[curFieldIdx++] = OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  private void forwardField() {
+    fieldOffsets[curFieldIdx++] = curOffset;
+  }
+
+  public void putBool(boolean val) {
+    ensureSize(SizeOf.SIZE_OF_BOOL);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putByte(recordStartAddr() + curOffset, (byte) (val ? 0x01 : 0x00));
+
+    curOffset += SizeOf.SIZE_OF_BOOL;
+  }
+
+  public void putInt2(short val) {
+    ensureSize(SizeOf.SIZE_OF_SHORT);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putShort(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_SHORT;
+  }
+
+  public void putInt4(int val) {
+    ensureSize(SizeOf.SIZE_OF_INT);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_INT;
+  }
+
+  public void putInt8(long val) {
+    ensureSize(SizeOf.SIZE_OF_LONG);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putLong(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_LONG;
+  }
+
+  public void putFloat4(float val) {
+    ensureSize(SizeOf.SIZE_OF_FLOAT);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putFloat(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_FLOAT;
+  }
+
+  public void putFloat8(double val) {
+    ensureSize(SizeOf.SIZE_OF_DOUBLE);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putDouble(recordStartAddr() + curOffset, val);
+    curOffset += SizeOf.SIZE_OF_DOUBLE;
+  }
+
+  public void putText(String val) {
+    byte[] bytes = val.getBytes(TextDatum.DEFAULT_CHARSET);
+    putText(bytes);
+  }
+
+  public void putText(byte[] val) {
+    int bytesLen = val.length;
+
+    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
+    curOffset += SizeOf.SIZE_OF_INT;
+
+    OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
+        recordStartAddr() + curOffset, bytesLen);
+    curOffset += bytesLen;
+  }
+
+  public void putBlob(byte[] val) {
+    int bytesLen = val.length;
+
+    ensureSize(SizeOf.SIZE_OF_INT + bytesLen);
+    forwardField();
+
+    OffHeapMemory.UNSAFE.putInt(recordStartAddr() + curOffset, bytesLen);
+    curOffset += SizeOf.SIZE_OF_INT;
+
+    OffHeapMemory.UNSAFE.copyMemory(val, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, null,
+        recordStartAddr() + curOffset, bytesLen);
+    curOffset += bytesLen;
+  }
+
+  public void putTimestamp(long val) {
+    putInt8(val);
+  }
+
+  public void putDate(int val) {
+    putInt4(val);
+  }
+
+  public void putTime(long val) {
+    putInt8(val);
+  }
+
+  public void putInterval(IntervalDatum val) {
+    ensureSize(SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG);
+    forwardField();
+
+    long offset = recordStartAddr() + curOffset;
+    OffHeapMemory.UNSAFE.putInt(offset, val.getMonths());
+    offset += SizeOf.SIZE_OF_INT;
+    OffHeapMemory.UNSAFE.putLong(offset, val.getMilliSeconds());
+    curOffset += SizeOf.SIZE_OF_INT + SizeOf.SIZE_OF_LONG;
+  }
+
+  public void putInet4(int val) {
+    putInt4(val);
+  }
+
+  public void putProtoDatum(ProtobufDatum val) {
+    putBlob(val.asByteArray());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
new file mode 100644
index 0000000..14e67b2
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.FileUtil;
+
+/**
+ * It specifies the maximum size or increasing ratio. In addition,
+ * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
+ * due to ByteBuffer.
+ */
+public class ResizableLimitSpec {
+  private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
+
+  public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
+  public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
+
+  private final long initSize;
+  private final long limitBytes;
+  private final float incRatio;
+  private final float allowedOVerflowRatio;
+  private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
+  private final static float DEFAULT_INCREASE_RATIO = 1.0f;
+
+  public ResizableLimitSpec(long initSize) {
+    this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes) {
+    this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
+    this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
+    Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
+    Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
+    Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
+    Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
+    Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
+
+    if (initSize == limitBytes) {
+      long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
+
+      if (overflowedSize > Integer.MAX_VALUE) {
+        overflowedSize = Integer.MAX_VALUE;
+      }
+
+      this.initSize = overflowedSize;
+      this.limitBytes = overflowedSize;
+    } else {
+      this.initSize = initSize;
+      limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
+
+      if (limitBytes > Integer.MAX_VALUE) {
+        this.limitBytes = Integer.MAX_VALUE;
+      } else {
+        this.limitBytes = limitBytes;
+      }
+    }
+
+    this.allowedOVerflowRatio = allowedOverflowRatio;
+    this.incRatio = incRatio;
+  }
+
+  public long initialSize() {
+    return initSize;
+  }
+
+  public long limit() {
+    return limitBytes;
+  }
+
+  public float remainRatio(long currentSize) {
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    if (currentSize > Integer.MAX_VALUE) {
+      currentSize = Integer.MAX_VALUE;
+    }
+    return (float)currentSize / (float)limitBytes;
+  }
+
+  public boolean canIncrease(long currentSize) {
+    return remain(currentSize) > 0;
+  }
+
+  public long remain(long currentSize) {
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
+  }
+
+  public int increasedSize(int currentSize) {
+    if (currentSize < initSize) {
+      return (int) initSize;
+    }
+
+    if (currentSize > Integer.MAX_VALUE) {
+      LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
+      return Integer.MAX_VALUE;
+    }
+    long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
+
+    if (nextSize > limitBytes) {
+      LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
+      nextSize = limitBytes;
+    }
+
+    if (nextSize > Integer.MAX_VALUE) {
+      LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
+      nextSize = Integer.MAX_VALUE;
+    }
+
+    return (int) nextSize;
+  }
+
+  @Override
+  public String toString() {
+    return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
+        + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
+        + ",inc_ratio=" + incRatio;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
new file mode 100644
index 0000000..a2b2561
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
@@ -0,0 +1,73 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+
+/**
+ * The call sequence should be as follows:
+ *
+ * <pre>
+ *   startRow() -->  skipField() or putXXX --> endRow()
+ * </pre>
+ *
+ * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
+ */
+public interface RowWriter {
+
+  public TajoDataTypes.DataType [] dataTypes();
+
+  public boolean startRow();
+
+  public void endRow();
+
+  public void skipField();
+
+  public void putBool(boolean val);
+
+  public void putInt2(short val);
+
+  public void putInt4(int val);
+
+  public void putInt8(long val);
+
+  public void putFloat4(float val);
+
+  public void putFloat8(double val);
+
+  public void putText(String val);
+
+  public void putText(byte[] val);
+
+  public void putBlob(byte[] val);
+
+  public void putTimestamp(long val);
+
+  public void putTime(long val);
+
+  public void putDate(int val);
+
+  public void putInterval(IntervalDatum val);
+
+  public void putInet4(int val);
+
+  public void putProtoDatum(ProtobufDatum datum);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
new file mode 100644
index 0000000..b742e6d
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -0,0 +1,311 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.UnsafeUtil;
+
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public abstract class UnSafeTuple implements Tuple {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  private DirectBuffer bb;
+  private int relativePos;
+  private int length;
+  private DataType [] types;
+
+  protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+    this.bb = (DirectBuffer) bb;
+    this.relativePos = relativePos;
+    this.length = length;
+    this.types = types;
+  }
+
+  void set(ByteBuffer bb, DataType [] types) {
+    set(bb, 0, bb.limit(), types);
+  }
+
+  @Override
+  public int size() {
+    return types.length;
+  }
+
+  public ByteBuffer nioBuffer() {
+    return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
+  }
+
+  public HeapTuple toHeapTuple() {
+    byte [] bytes = new byte[length];
+    UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length);
+    return new HeapTuple(bytes, types);
+  }
+
+  public void copyFrom(UnSafeTuple tuple) {
+    Preconditions.checkNotNull(tuple);
+
+    ((ByteBuffer) bb).clear();
+    if (length < tuple.length) {
+      UnsafeUtil.free((ByteBuffer) bb);
+      bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
+      this.relativePos = 0;
+      this.length = tuple.length;
+    }
+
+    ((ByteBuffer) bb).put(tuple.nioBuffer());
+  }
+
+  private int getFieldOffset(int fieldId) {
+    return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+  }
+
+  public long getFieldAddr(int fieldId) {
+    int fieldOffset = getFieldOffset(fieldId);
+    if (fieldOffset == -1) {
+      throw new RuntimeException("Invalid Field Access: " + fieldId);
+    }
+    return bb.address() + relativePos + fieldOffset;
+  }
+
+  @Override
+  public boolean contains(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public void clear() {
+    // nothing to do
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+  }
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+  }
+
+  @Override
+  public Datum get(int fieldId) {
+    if (isNull(fieldId)) {
+      return NullDatum.get();
+    }
+
+    switch (types[fieldId].getType()) {
+    case BOOLEAN:
+      return DatumFactory.createBool(getBool(fieldId));
+    case INT1:
+    case INT2:
+      return DatumFactory.createInt2(getInt2(fieldId));
+    case INT4:
+      return DatumFactory.createInt4(getInt4(fieldId));
+    case INT8:
+      return DatumFactory.createInt8(getInt4(fieldId));
+    case FLOAT4:
+      return DatumFactory.createFloat4(getFloat4(fieldId));
+    case FLOAT8:
+      return DatumFactory.createFloat8(getFloat8(fieldId));
+    case TEXT:
+      return DatumFactory.createText(getText(fieldId));
+    case TIMESTAMP:
+      return DatumFactory.createTimestamp(getInt8(fieldId));
+    case DATE:
+      return DatumFactory.createDate(getInt4(fieldId));
+    case TIME:
+      return DatumFactory.createTime(getInt8(fieldId));
+    case INTERVAL:
+      return getInterval(fieldId);
+    case INET4:
+      return DatumFactory.createInet4(getInt4(fieldId));
+    case PROTOBUF:
+      return getProtobufDatum(fieldId);
+    default:
+      throw new UnsupportedException("Unknown type: " + types[fieldId]);
+    }
+  }
+
+  @Override
+  public void setOffset(long offset) {
+  }
+
+  @Override
+  public long getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return UNSAFE.getByte(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return UNSAFE.getChar(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return bytes;
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    long addr = getFieldAddr(fieldId);
+    return UNSAFE.getShort(addr);
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return UNSAFE.getInt(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return UNSAFE.getLong(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return UNSAFE.getFloat(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return UNSAFE.getDouble(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return new String(bytes);
+  }
+
+  public IntervalDatum getInterval(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int months = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+    long millisecs = UNSAFE.getLong(pos);
+    return new IntervalDatum(months, millisecs);
+  }
+
+  @Override
+  public Datum getProtobufDatum(int fieldId) {
+    byte [] bytes = getBytes(fieldId);
+
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+    Message.Builder builder = factory.newBuilder();
+    try {
+      builder.mergeFrom(bytes);
+    } catch (InvalidProtocolBufferException e) {
+      return NullDatum.get();
+    }
+
+    return new ProtobufDatum(builder.build());
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    return toHeapTuple();
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum [] datums = new Datum[size()];
+    for (int i = 0; i < size(); i++) {
+      if (contains(i)) {
+        datums[i] = get(i);
+      } else {
+        datums[i] = NullDatum.get();
+      }
+    }
+    return datums;
+  }
+
+  @Override
+  public String toString() {
+    return VTuple.toDisplayString(getValues());
+  }
+
+  public abstract void release();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
new file mode 100644
index 0000000..73e1e2f
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
@@ -0,0 +1,99 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLongs;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteOrder;
+
+/**
+ * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
+ */
+public class UnSafeTupleBytesComparator {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  static final boolean littleEndian =
+      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+  public static int compare(long ptr1, long ptr2) {
+    int lstrLen = UNSAFE.getInt(ptr1);
+    int rstrLen = UNSAFE.getInt(ptr2);
+
+    ptr1 += SizeOf.SIZE_OF_INT;
+    ptr2 += SizeOf.SIZE_OF_INT;
+
+    int minLength = Math.min(lstrLen, rstrLen);
+    int minWords = minLength / Longs.BYTES;
+
+        /*
+         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+         * time is no slower than comparing 4 bytes at a time even on 32-bit.
+         * On the other hand, it is substantially faster on 64-bit.
+         */
+    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+      long lw = UNSAFE.getLong(ptr1);
+      long rw = UNSAFE.getLong(ptr2);
+      long diff = lw ^ rw;
+
+      if (diff != 0) {
+        if (!littleEndian) {
+          return UnsignedLongs.compare(lw, rw);
+        }
+
+        // Use binary search
+        int n = 0;
+        int y;
+        int x = (int) diff;
+        if (x == 0) {
+          x = (int) (diff >>> 32);
+          n = 32;
+        }
+
+        y = x << 16;
+        if (y == 0) {
+          n += 16;
+        } else {
+          x = y;
+        }
+
+        y = x << 8;
+        if (y == 0) {
+          n += 8;
+        }
+        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+      }
+
+      ptr1 += SizeOf.SIZE_OF_LONG;
+      ptr2 += SizeOf.SIZE_OF_LONG;
+    }
+
+    // The epilogue to cover the last (minLength % 8) elements.
+    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+      int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
+      if (result != 0) {
+        return result;
+      }
+    }
+    return lstrLen - rstrLen;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
new file mode 100644
index 0000000..51dbb29
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class ZeroCopyTuple extends UnSafeTuple {
+
+  public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+    super.set(bb, relativePos, length, types);
+  }
+
+  @Override
+  public void release() {
+    // nothing to do
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
new file mode 100644
index 0000000..f5c8a08
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.index";
+option java_outer_classname = "IndexProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message TupleComparatorProto {
+  required SchemaProto schema = 1;
+  repeated SortSpecProto sortSpecs = 2;
+  repeated TupleComparatorSpecProto compSpecs = 3;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
new file mode 100644
index 0000000..67033ed
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -0,0 +1,198 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<configuration>
+  <!-- Storage Manager Configuration -->
+  <property>
+    <name>tajo.storage.manager.hdfs.class</name>
+    <value>org.apache.tajo.storage.FileStorageManager</value>
+  </property>
+  <property>
+    <name>tajo.storage.manager.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.manager.concurrency.perDisk</name>
+    <value>1</value>
+    <description></description>
+  </property>
+
+  <!--- Registered Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler</name>
+    <value>textfile,csv,json,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+  </property>
+
+  <!--- Fragment Class Configurations -->
+  <property>
+    <name>tajo.storage.fragment.textfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.csv.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.json.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.raw.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.rcfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.row.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.parquet.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.sequencefile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.avro.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
+  </property>
+
+  <!--- Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler.textfile.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.json.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HBaseScanner</value>
+  </property>
+  
+  <!--- Appender Handler -->
+  <property>
+    <name>tajo.storage.appender-handler</name>
+    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.textfile.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.json.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.hfile.class</name>
+    <value>org.apache.tajo.storage.hbase.HFileAppender</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
new file mode 100644
index 0000000..0251dc7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFrameTuple {
+  private Tuple tuple1;
+  private Tuple tuple2;
+
+  @Before
+  public void setUp() throws Exception {
+    tuple1 = new VTuple(11);
+    tuple1.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar('9'),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("hyunsik"),
+        DatumFactory.createBlob("hyunsik".getBytes()),
+        DatumFactory.createInet4("192.168.0.1")
+    });
+    
+    tuple2 = new VTuple(11);
+    tuple2.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar('9'),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("hyunsik"),
+        DatumFactory.createBlob("hyunsik".getBytes()),
+        DatumFactory.createInet4("192.168.0.1")
+    });
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public final void testFrameTuple() {
+    Tuple frame = new FrameTuple(tuple1, tuple2);
+    assertEquals(22, frame.size());
+    for (int i = 0; i < 22; i++) {
+      assertTrue(frame.contains(i));
+    }
+    
+    assertEquals(DatumFactory.createInt8(23l), frame.get(5));
+    assertEquals(DatumFactory.createInt8(23l), frame.get(16));
+    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
+    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
+  }
+}