You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:27 UTC

[12/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
new file mode 100644
index 0000000..14e67b2
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.util.FileUtil;
+
+/**
+ * It specifies the maximum size or increasing ratio. In addition,
+ * it guarantees that all numbers are less than or equal to Integer.MAX_VALUE 2^31
+ * due to ByteBuffer.
+ */
+public class ResizableLimitSpec {
+  private final Log LOG = LogFactory.getLog(ResizableLimitSpec.class);
+
+  public static final int MAX_SIZE_BYTES = Integer.MAX_VALUE;
+  public static final ResizableLimitSpec DEFAULT_LIMIT = new ResizableLimitSpec(Integer.MAX_VALUE);
+
+  private final long initSize;
+  private final long limitBytes;
+  private final float incRatio;
+  private final float allowedOVerflowRatio;
+  private final static float DEFAULT_ALLOWED_OVERFLOW_RATIO = 0.1f;
+  private final static float DEFAULT_INCREASE_RATIO = 1.0f;
+
+  public ResizableLimitSpec(long initSize) {
+    this(initSize, MAX_SIZE_BYTES, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes) {
+    this(initSize, limitBytes, DEFAULT_ALLOWED_OVERFLOW_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflow) {
+    this(initSize, limitBytes, allowedOverflow, DEFAULT_INCREASE_RATIO);
+  }
+
+  public ResizableLimitSpec(long initSize, long limitBytes, float allowedOverflowRatio, float incRatio) {
+    Preconditions.checkArgument(initSize > 0, "initial size must be greater than 0 bytes.");
+    Preconditions.checkArgument(initSize <= MAX_SIZE_BYTES, "The maximum initial size is 2GB.");
+    Preconditions.checkArgument(limitBytes > 0, "The limit size must be greater than 0 bytes.");
+    Preconditions.checkArgument(limitBytes <= MAX_SIZE_BYTES, "The maximum limit size is 2GB.");
+    Preconditions.checkArgument(incRatio > 0.0f, "Increase Ratio must be greater than 0.");
+
+    if (initSize == limitBytes) {
+      long overflowedSize = (long) (initSize + (initSize * allowedOverflowRatio));
+
+      if (overflowedSize > Integer.MAX_VALUE) {
+        overflowedSize = Integer.MAX_VALUE;
+      }
+
+      this.initSize = overflowedSize;
+      this.limitBytes = overflowedSize;
+    } else {
+      this.initSize = initSize;
+      limitBytes = (long) (limitBytes + (limitBytes * allowedOverflowRatio));
+
+      if (limitBytes > Integer.MAX_VALUE) {
+        this.limitBytes = Integer.MAX_VALUE;
+      } else {
+        this.limitBytes = limitBytes;
+      }
+    }
+
+    this.allowedOVerflowRatio = allowedOverflowRatio;
+    this.incRatio = incRatio;
+  }
+
+  public long initialSize() {
+    return initSize;
+  }
+
+  public long limit() {
+    return limitBytes;
+  }
+
+  public float remainRatio(long currentSize) {
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    if (currentSize > Integer.MAX_VALUE) {
+      currentSize = Integer.MAX_VALUE;
+    }
+    return (float)currentSize / (float)limitBytes;
+  }
+
+  public boolean canIncrease(long currentSize) {
+    return remain(currentSize) > 0;
+  }
+
+  public long remain(long currentSize) {
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
+  }
+
+  public int increasedSize(int currentSize) {
+    if (currentSize < initSize) {
+      return (int) initSize;
+    }
+
+    if (currentSize > Integer.MAX_VALUE) {
+      LOG.warn("Current size already exceeds the maximum size (" + Integer.MAX_VALUE + " bytes)");
+      return Integer.MAX_VALUE;
+    }
+    long nextSize = (long) (currentSize + ((float) currentSize * incRatio));
+
+    if (nextSize > limitBytes) {
+      LOG.info("Increasing reaches size limit (" + FileUtil.humanReadableByteCount(limitBytes, false) + ")");
+      nextSize = limitBytes;
+    }
+
+    if (nextSize > Integer.MAX_VALUE) {
+      LOG.info("Increasing reaches maximum size (" + FileUtil.humanReadableByteCount(Integer.MAX_VALUE, false) + ")");
+      nextSize = Integer.MAX_VALUE;
+    }
+
+    return (int) nextSize;
+  }
+
+  @Override
+  public String toString() {
+    return "init=" + FileUtil.humanReadableByteCount(initSize, false) + ",limit="
+        + FileUtil.humanReadableByteCount(limitBytes, false) + ",overflow_ratio=" + allowedOVerflowRatio
+        + ",inc_ratio=" + incRatio;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
new file mode 100644
index 0000000..a2b2561
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/RowWriter.java
@@ -0,0 +1,73 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.IntervalDatum;
+import org.apache.tajo.datum.ProtobufDatum;
+
+/**
+ * The call sequence should be as follows:
+ *
+ * <pre>
+ *   startRow() -->  skipField() or putXXX --> endRow()
+ * </pre>
+ *
+ * The total number of skipField and putXXX invocations must be equivalent to the number of fields.
+ */
+public interface RowWriter {
+
+  public TajoDataTypes.DataType [] dataTypes();
+
+  public boolean startRow();
+
+  public void endRow();
+
+  public void skipField();
+
+  public void putBool(boolean val);
+
+  public void putInt2(short val);
+
+  public void putInt4(int val);
+
+  public void putInt8(long val);
+
+  public void putFloat4(float val);
+
+  public void putFloat8(double val);
+
+  public void putText(String val);
+
+  public void putText(byte[] val);
+
+  public void putBlob(byte[] val);
+
+  public void putTimestamp(long val);
+
+  public void putTime(long val);
+
+  public void putDate(int val);
+
+  public void putInterval(IntervalDatum val);
+
+  public void putInet4(int val);
+
+  public void putProtoDatum(ProtobufDatum datum);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
new file mode 100644
index 0000000..b742e6d
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTuple.java
@@ -0,0 +1,311 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+
+import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.UnsafeUtil;
+
+import sun.misc.Unsafe;
+import sun.nio.ch.DirectBuffer;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.Charset;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public abstract class UnSafeTuple implements Tuple {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  private DirectBuffer bb;
+  private int relativePos;
+  private int length;
+  private DataType [] types;
+
+  protected void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+    this.bb = (DirectBuffer) bb;
+    this.relativePos = relativePos;
+    this.length = length;
+    this.types = types;
+  }
+
+  void set(ByteBuffer bb, DataType [] types) {
+    set(bb, 0, bb.limit(), types);
+  }
+
+  @Override
+  public int size() {
+    return types.length;
+  }
+
+  public ByteBuffer nioBuffer() {
+    return ((ByteBuffer)((ByteBuffer)bb).duplicate().position(relativePos).limit(relativePos + length)).slice();
+  }
+
+  public HeapTuple toHeapTuple() {
+    byte [] bytes = new byte[length];
+    UNSAFE.copyMemory(null, bb.address() + relativePos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, length);
+    return new HeapTuple(bytes, types);
+  }
+
+  public void copyFrom(UnSafeTuple tuple) {
+    Preconditions.checkNotNull(tuple);
+
+    ((ByteBuffer) bb).clear();
+    if (length < tuple.length) {
+      UnsafeUtil.free((ByteBuffer) bb);
+      bb = (DirectBuffer) ByteBuffer.allocateDirect(tuple.length).order(ByteOrder.nativeOrder());
+      this.relativePos = 0;
+      this.length = tuple.length;
+    }
+
+    ((ByteBuffer) bb).put(tuple.nioBuffer());
+  }
+
+  private int getFieldOffset(int fieldId) {
+    return UNSAFE.getInt(bb.address() + relativePos + SizeOf.SIZE_OF_INT + (fieldId * SizeOf.SIZE_OF_INT));
+  }
+
+  public long getFieldAddr(int fieldId) {
+    int fieldOffset = getFieldOffset(fieldId);
+    if (fieldOffset == -1) {
+      throw new RuntimeException("Invalid Field Access: " + fieldId);
+    }
+    return bb.address() + relativePos + fieldOffset;
+  }
+
+  @Override
+  public boolean contains(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNull(int fieldid) {
+    return getFieldOffset(fieldid) == OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public boolean isNotNull(int fieldid) {
+    return getFieldOffset(fieldid) > OffHeapRowBlock.NULL_FIELD_OFFSET;
+  }
+
+  @Override
+  public void clear() {
+    // nothing to do
+  }
+
+  @Override
+  public void put(int fieldId, Datum value) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum).");
+  }
+
+  @Override
+  public void put(int fieldId, Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Datum []).");
+  }
+
+  @Override
+  public void put(int fieldId, Tuple tuple) {
+    throw new UnsupportedException("UnSafeTuple does not support put(int, Tuple).");
+  }
+
+  @Override
+  public void put(Datum[] values) {
+    throw new UnsupportedException("UnSafeTuple does not support put(Datum []).");
+  }
+
+  @Override
+  public Datum get(int fieldId) {
+    if (isNull(fieldId)) {
+      return NullDatum.get();
+    }
+
+    switch (types[fieldId].getType()) {
+    case BOOLEAN:
+      return DatumFactory.createBool(getBool(fieldId));
+    case INT1:
+    case INT2:
+      return DatumFactory.createInt2(getInt2(fieldId));
+    case INT4:
+      return DatumFactory.createInt4(getInt4(fieldId));
+    case INT8:
+      return DatumFactory.createInt8(getInt4(fieldId));
+    case FLOAT4:
+      return DatumFactory.createFloat4(getFloat4(fieldId));
+    case FLOAT8:
+      return DatumFactory.createFloat8(getFloat8(fieldId));
+    case TEXT:
+      return DatumFactory.createText(getText(fieldId));
+    case TIMESTAMP:
+      return DatumFactory.createTimestamp(getInt8(fieldId));
+    case DATE:
+      return DatumFactory.createDate(getInt4(fieldId));
+    case TIME:
+      return DatumFactory.createTime(getInt8(fieldId));
+    case INTERVAL:
+      return getInterval(fieldId);
+    case INET4:
+      return DatumFactory.createInet4(getInt4(fieldId));
+    case PROTOBUF:
+      return getProtobufDatum(fieldId);
+    default:
+      throw new UnsupportedException("Unknown type: " + types[fieldId]);
+    }
+  }
+
+  @Override
+  public void setOffset(long offset) {
+  }
+
+  @Override
+  public long getOffset() {
+    return 0;
+  }
+
+  @Override
+  public boolean getBool(int fieldId) {
+    return UNSAFE.getByte(getFieldAddr(fieldId)) == 0x01;
+  }
+
+  @Override
+  public byte getByte(int fieldId) {
+    return UNSAFE.getByte(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public char getChar(int fieldId) {
+    return UNSAFE.getChar(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public byte[] getBytes(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return bytes;
+  }
+
+  @Override
+  public short getInt2(int fieldId) {
+    long addr = getFieldAddr(fieldId);
+    return UNSAFE.getShort(addr);
+  }
+
+  @Override
+  public int getInt4(int fieldId) {
+    return UNSAFE.getInt(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public long getInt8(int fieldId) {
+    return UNSAFE.getLong(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public float getFloat4(int fieldId) {
+    return UNSAFE.getFloat(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public double getFloat8(int fieldId) {
+    return UNSAFE.getDouble(getFieldAddr(fieldId));
+  }
+
+  @Override
+  public String getText(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return new String(bytes);
+  }
+
+  public IntervalDatum getInterval(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int months = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+    long millisecs = UNSAFE.getLong(pos);
+    return new IntervalDatum(months, millisecs);
+  }
+
+  @Override
+  public Datum getProtobufDatum(int fieldId) {
+    byte [] bytes = getBytes(fieldId);
+
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(types[fieldId].getCode());
+    Message.Builder builder = factory.newBuilder();
+    try {
+      builder.mergeFrom(bytes);
+    } catch (InvalidProtocolBufferException e) {
+      return NullDatum.get();
+    }
+
+    return new ProtobufDatum(builder.build());
+  }
+
+  @Override
+  public char[] getUnicodeChars(int fieldId) {
+    long pos = getFieldAddr(fieldId);
+    int len = UNSAFE.getInt(pos);
+    pos += SizeOf.SIZE_OF_INT;
+
+    byte [] bytes = new byte[len];
+    UNSAFE.copyMemory(null, pos, bytes, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET, len);
+    return StringUtils.convertBytesToChars(bytes, Charset.forName("UTF-8"));
+  }
+
+  @Override
+  public Tuple clone() throws CloneNotSupportedException {
+    return toHeapTuple();
+  }
+
+  @Override
+  public Datum[] getValues() {
+    Datum [] datums = new Datum[size()];
+    for (int i = 0; i < size(); i++) {
+      if (contains(i)) {
+        datums[i] = get(i);
+      } else {
+        datums[i] = NullDatum.get();
+      }
+    }
+    return datums;
+  }
+
+  @Override
+  public String toString() {
+    return VTuple.toDisplayString(getValues());
+  }
+
+  public abstract void release();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
new file mode 100644
index 0000000..73e1e2f
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/UnSafeTupleBytesComparator.java
@@ -0,0 +1,99 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedLongs;
+import org.apache.tajo.util.SizeOf;
+import org.apache.tajo.util.UnsafeUtil;
+import sun.misc.Unsafe;
+
+import java.nio.ByteOrder;
+
+/**
+ * It directly access UTF bytes in UnSafeTuple without any copy. It is used by compiled TupleComparator.
+ */
+public class UnSafeTupleBytesComparator {
+  private static final Unsafe UNSAFE = UnsafeUtil.unsafe;
+
+  static final boolean littleEndian =
+      ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+
+  public static int compare(long ptr1, long ptr2) {
+    int lstrLen = UNSAFE.getInt(ptr1);
+    int rstrLen = UNSAFE.getInt(ptr2);
+
+    ptr1 += SizeOf.SIZE_OF_INT;
+    ptr2 += SizeOf.SIZE_OF_INT;
+
+    int minLength = Math.min(lstrLen, rstrLen);
+    int minWords = minLength / Longs.BYTES;
+
+        /*
+         * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
+         * time is no slower than comparing 4 bytes at a time even on 32-bit.
+         * On the other hand, it is substantially faster on 64-bit.
+         */
+    for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
+      long lw = UNSAFE.getLong(ptr1);
+      long rw = UNSAFE.getLong(ptr2);
+      long diff = lw ^ rw;
+
+      if (diff != 0) {
+        if (!littleEndian) {
+          return UnsignedLongs.compare(lw, rw);
+        }
+
+        // Use binary search
+        int n = 0;
+        int y;
+        int x = (int) diff;
+        if (x == 0) {
+          x = (int) (diff >>> 32);
+          n = 32;
+        }
+
+        y = x << 16;
+        if (y == 0) {
+          n += 16;
+        } else {
+          x = y;
+        }
+
+        y = x << 8;
+        if (y == 0) {
+          n += 8;
+        }
+        return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL));
+      }
+
+      ptr1 += SizeOf.SIZE_OF_LONG;
+      ptr2 += SizeOf.SIZE_OF_LONG;
+    }
+
+    // The epilogue to cover the last (minLength % 8) elements.
+    for (int i = minWords * Longs.BYTES; i < minLength; i++) {
+      int result = UNSAFE.getByte(ptr1++) - UNSAFE.getByte(ptr2++);
+      if (result != 0) {
+        return result;
+      }
+    }
+    return lstrLen - rstrLen;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
new file mode 100644
index 0000000..51dbb29
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/tuple/offheap/ZeroCopyTuple.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.tajo.common.TajoDataTypes.DataType;
+
+public class ZeroCopyTuple extends UnSafeTuple {
+
+  public void set(ByteBuffer bb, int relativePos, int length, DataType [] types) {
+    super.set(bb, relativePos, length, types);
+  }
+
+  @Override
+  public void release() {
+    // nothing to do
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
new file mode 100644
index 0000000..f5c8a08
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/proto/IndexProtos.proto
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.index";
+option java_outer_classname = "IndexProtos";
+option optimize_for = SPEED;
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+import "CatalogProtos.proto";
+
+message TupleComparatorProto {
+  required SchemaProto schema = 1;
+  repeated SortSpecProto sortSpecs = 2;
+  repeated TupleComparatorSpecProto compSpecs = 3;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
new file mode 100644
index 0000000..47d11c7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -0,0 +1,184 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<configuration>
+  <!-- Storage Manager Configuration -->
+  <property>
+    <name>tajo.storage.manager.hdfs.class</name>
+    <value>org.apache.tajo.storage.FileStorageManager</value>
+  </property>
+  <property>
+    <name>tajo.storage.manager.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.manager.concurrency.perDisk</name>
+    <value>1</value>
+    <description></description>
+  </property>
+
+  <!--- Registered Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler</name>
+    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+  </property>
+
+  <!--- Fragment Class Configurations -->
+  <property>
+    <name>tajo.storage.fragment.textfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.csv.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.raw.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.rcfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.row.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.parquet.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.sequencefile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.avro.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HBaseFragment</value>
+  </property>
+
+  <!--- Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler.textfile.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HBaseScanner</value>
+  </property>
+  
+  <!--- Appender Handler -->
+  <property>
+    <name>tajo.storage.appender-handler</name>
+    <value>textfile,csv,raw,rcfile,row,parquet,sequencefile,avro,hbase</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.textfile.class</name>
+    <value>org.apache.tajo.storage.text.DelimitedTextFile$DelimitedTextFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.hfile.class</name>
+    <value>org.apache.tajo.storage.hbase.HFileAppender</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
new file mode 100644
index 0000000..0251dc7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestFrameTuple.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestFrameTuple {
+  private Tuple tuple1;
+  private Tuple tuple2;
+
+  @Before
+  public void setUp() throws Exception {
+    tuple1 = new VTuple(11);
+    tuple1.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar('9'),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("hyunsik"),
+        DatumFactory.createBlob("hyunsik".getBytes()),
+        DatumFactory.createInet4("192.168.0.1")
+    });
+    
+    tuple2 = new VTuple(11);
+    tuple2.put(new Datum[] {
+        DatumFactory.createBool(true),
+        DatumFactory.createBit((byte) 0x99),
+        DatumFactory.createChar('9'),
+        DatumFactory.createInt2((short) 17),
+        DatumFactory.createInt4(59),
+        DatumFactory.createInt8(23l),
+        DatumFactory.createFloat4(77.9f),
+        DatumFactory.createFloat8(271.9f),
+        DatumFactory.createText("hyunsik"),
+        DatumFactory.createBlob("hyunsik".getBytes()),
+        DatumFactory.createInet4("192.168.0.1")
+    });
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public final void testFrameTuple() {
+    Tuple frame = new FrameTuple(tuple1, tuple2);
+    assertEquals(22, frame.size());
+    for (int i = 0; i < 22; i++) {
+      assertTrue(frame.contains(i));
+    }
+    
+    assertEquals(DatumFactory.createInt8(23l), frame.get(5));
+    assertEquals(DatumFactory.createInt8(23l), frame.get(16));
+    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(10));
+    assertEquals(DatumFactory.createInet4("192.168.0.1"), frame.get(21));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
new file mode 100644
index 0000000..c6149f7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.BytesUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestLazyTuple {
+
+  Schema schema;
+  byte[][] textRow;
+  byte[] nullbytes;
+  SerializerDeserializer serde;
+
+  @Before
+  public void setUp() {
+    nullbytes = "\\N".getBytes();
+
+    schema = new Schema();
+    schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
+    schema.addColumn("col2", TajoDataTypes.Type.BIT);
+    schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7);
+    schema.addColumn("col4", TajoDataTypes.Type.INT2);
+    schema.addColumn("col5", TajoDataTypes.Type.INT4);
+    schema.addColumn("col6", TajoDataTypes.Type.INT8);
+    schema.addColumn("col7", TajoDataTypes.Type.FLOAT4);
+    schema.addColumn("col8", TajoDataTypes.Type.FLOAT8);
+    schema.addColumn("col9", TajoDataTypes.Type.TEXT);
+    schema.addColumn("col10", TajoDataTypes.Type.BLOB);
+    schema.addColumn("col11", TajoDataTypes.Type.INET4);
+    schema.addColumn("col12", TajoDataTypes.Type.INT4);
+    schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE);
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(DatumFactory.createBool(true)).append('|');
+    sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|');
+    sb.append(DatumFactory.createChar("str")).append('|');
+    sb.append(DatumFactory.createInt2((short) 17)).append('|');
+    sb.append(DatumFactory.createInt4(59)).append('|');
+    sb.append(DatumFactory.createInt8(23l)).append('|');
+    sb.append(DatumFactory.createFloat4(77.9f)).append('|');
+    sb.append(DatumFactory.createFloat8(271.9f)).append('|');
+    sb.append(DatumFactory.createText("str2")).append('|');
+    sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
+    sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
+    sb.append(new String(nullbytes)).append('|');
+    sb.append(NullDatum.get());
+    textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|');
+    serde = new TextSerializerDeserializer();
+  }
+
+  @Test
+  public void testGetDatum() {
+
+    LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
+    assertEquals(DatumFactory.createBool(true), t1.get(0));
+    assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
+    assertEquals(DatumFactory.createChar("str"), t1.get(2));
+    assertEquals(DatumFactory.createInt2((short) 17), t1.get(3));
+    assertEquals(DatumFactory.createInt4(59), t1.get(4));
+    assertEquals(DatumFactory.createInt8(23l), t1.get(5));
+    assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6));
+    assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7));
+    assertEquals(DatumFactory.createText("str2"), t1.get(8));
+    assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
+    assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
+    assertEquals(NullDatum.get(), t1.get(11));
+    assertEquals(NullDatum.get(), t1.get(12));
+  }
+
+  @Test
+  public void testContain() {
+    int colNum = schema.size();
+
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(3, DatumFactory.createInt4(1));
+    t1.put(7, DatumFactory.createInt4(1));
+
+    assertTrue(t1.contains(0));
+    assertFalse(t1.contains(1));
+    assertFalse(t1.contains(2));
+    assertTrue(t1.contains(3));
+    assertFalse(t1.contains(4));
+    assertFalse(t1.contains(5));
+    assertFalse(t1.contains(6));
+    assertTrue(t1.contains(7));
+    assertFalse(t1.contains(8));
+    assertFalse(t1.contains(9));
+    assertFalse(t1.contains(10));
+    assertFalse(t1.contains(11));
+    assertFalse(t1.contains(12));
+  }
+
+  @Test
+  public void testPut() {
+    int colNum = schema.size();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    t1.put(0, DatumFactory.createText("str"));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(11, DatumFactory.createFloat4(0.76f));
+
+    assertTrue(t1.contains(0));
+    assertTrue(t1.contains(1));
+
+    assertEquals(t1.getText(0), "str");
+    assertEquals(t1.get(1).asInt4(), 2);
+    assertTrue(t1.get(11).asFloat4() == 0.76f);
+  }
+
+  @Test
+  public void testEquals() {
+    int colNum = schema.size();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+
+    assertEquals(t1, t2);
+
+    Tuple t3 = new VTuple(colNum);
+    t3.put(0, DatumFactory.createInt4(1));
+    t3.put(1, DatumFactory.createInt4(2));
+    t3.put(3, DatumFactory.createInt4(2));
+    assertEquals(t1, t3);
+    assertEquals(t2, t3);
+
+    LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1);
+    assertNotSame(t1, t4);
+  }
+
+  @Test
+  public void testHashCode() {
+    int colNum = schema.size();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("str"));
+
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+    t2.put(4, DatumFactory.createText("str"));
+
+    assertEquals(t1.hashCode(), t2.hashCode());
+
+    Tuple t3 = new VTuple(colNum);
+    t3.put(0, DatumFactory.createInt4(1));
+    t3.put(1, DatumFactory.createInt4(2));
+    t3.put(3, DatumFactory.createInt4(2));
+    t3.put(4, DatumFactory.createText("str"));
+    assertEquals(t1.hashCode(), t3.hashCode());
+    assertEquals(t2.hashCode(), t3.hashCode());
+
+    Tuple t4 = new VTuple(5);
+    t4.put(0, DatumFactory.createInt4(1));
+    t4.put(1, DatumFactory.createInt4(2));
+    t4.put(4, DatumFactory.createInt4(2));
+
+    assertNotSame(t1.hashCode(), t4.hashCode());
+  }
+
+  @Test
+  public void testPutTuple() {
+    int colNum = schema.size();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(2, DatumFactory.createInt4(3));
+
+
+    Schema schema2 = new Schema();
+    schema2.addColumn("col1", TajoDataTypes.Type.INT8);
+    schema2.addColumn("col2", TajoDataTypes.Type.INT8);
+
+    LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1);
+    t2.put(0, DatumFactory.createInt4(4));
+    t2.put(1, DatumFactory.createInt4(5));
+
+    t1.put(3, t2);
+
+    for (int i = 0; i < 5; i++) {
+      assertEquals(i + 1, t1.get(i).asInt4());
+    }
+  }
+
+  @Test
+  public void testInvalidNumber() {
+    byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|');
+    Schema schema = new Schema();
+    schema.addColumn("col1", TajoDataTypes.Type.INT2);
+    schema.addColumn("col2", TajoDataTypes.Type.INT4);
+    schema.addColumn("col3", TajoDataTypes.Type.INT8);
+    schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
+    schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
+
+    LazyTuple tuple = new LazyTuple(schema, bytes, 0);
+    assertEquals(bytes.length, tuple.size());
+
+    for (int i = 0; i < tuple.size(); i++){
+      assertEquals(NullDatum.get(), tuple.get(i));
+    }
+  }
+
+  @Test
+  public void testClone() throws CloneNotSupportedException {
+    int colNum = schema.size();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("str"));
+
+    LazyTuple t2 = (LazyTuple) t1.clone();
+    assertNotSame(t1, t2);
+    assertEquals(t1, t2);
+
+    assertSame(t1.get(4), t2.get(4));
+
+    t1.clear();
+    assertFalse(t1.equals(t2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
new file mode 100644
index 0000000..639ca04
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTupleComparator {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public final void testCompare() {
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.INT4);
+    schema.addColumn("col2", Type.INT4);
+    schema.addColumn("col3", Type.INT4);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.TEXT);
+    
+    Tuple tuple1 = new VTuple(5);
+    Tuple tuple2 = new VTuple(5);
+
+    tuple1.put(
+        new Datum[] {
+        DatumFactory.createInt4(9),
+        DatumFactory.createInt4(3),
+        DatumFactory.createInt4(33),
+        DatumFactory.createInt4(4),
+        DatumFactory.createText("abc")});
+    tuple2.put(
+        new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(25),
+        DatumFactory.createInt4(109),
+        DatumFactory.createInt4(4),
+        DatumFactory.createText("abd")});
+
+    SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false);
+    SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false);
+
+    BaseTupleComparator tc = new BaseTupleComparator(schema,
+        new SortSpec[] {sortKey1, sortKey2});
+    assertEquals(-1, tc.compare(tuple1, tuple2));
+    assertEquals(1, tc.compare(tuple2, tuple1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
new file mode 100644
index 0000000..1bbd9ec
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestVTuple {
+
+	/**
+	 * @throws Exception
+	 */
+	@Before
+	public void setUp() throws Exception {
+		
+	}
+	
+	@Test
+	public void testContain() {
+		VTuple t1 = new VTuple(260);
+		t1.put(0, DatumFactory.createInt4(1));
+		t1.put(1, DatumFactory.createInt4(1));
+		t1.put(27, DatumFactory.createInt4(1));
+		t1.put(96, DatumFactory.createInt4(1));
+		t1.put(257, DatumFactory.createInt4(1));
+		
+		assertTrue(t1.contains(0));
+		assertTrue(t1.contains(1));
+		assertFalse(t1.contains(2));
+		assertFalse(t1.contains(3));
+		assertFalse(t1.contains(4));
+		assertTrue(t1.contains(27));
+		assertFalse(t1.contains(28));
+		assertFalse(t1.contains(95));
+		assertTrue(t1.contains(96));
+		assertFalse(t1.contains(97));
+		assertTrue(t1.contains(257));
+	}
+	
+	@Test
+	public void testPut() {
+		VTuple t1 = new VTuple(260);
+		t1.put(0, DatumFactory.createText("str"));
+		t1.put(1, DatumFactory.createInt4(2));
+		t1.put(257, DatumFactory.createFloat4(0.76f));
+		
+		assertTrue(t1.contains(0));
+		assertTrue(t1.contains(1));
+		
+		assertEquals(t1.getText(0),"str");
+		assertEquals(t1.get(1).asInt4(),2);
+		assertTrue(t1.get(257).asFloat4() == 0.76f);
+	}
+
+  @Test
+	public void testEquals() {
+	  Tuple t1 = new VTuple(5);
+	  Tuple t2 = new VTuple(5);
+	  
+	  t1.put(0, DatumFactory.createInt4(1));
+	  t1.put(1, DatumFactory.createInt4(2));
+	  t1.put(3, DatumFactory.createInt4(2));
+	  
+	  t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+    
+    assertEquals(t1,t2);
+    
+    Tuple t3 = new VTuple(5);
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(4, DatumFactory.createInt4(2));
+    
+    assertNotSame(t1,t3);
+	}
+	
+	@Test
+	public void testHashCode() {
+	  Tuple t1 = new VTuple(5);
+    Tuple t2 = new VTuple(5);
+    
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("hyunsik"));
+    
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+    t2.put(4, DatumFactory.createText("hyunsik"));
+    
+    assertEquals(t1.hashCode(),t2.hashCode());
+    
+    Tuple t3 = new VTuple(5);
+    t3.put(0, DatumFactory.createInt4(1));
+    t3.put(1, DatumFactory.createInt4(2));
+    t3.put(4, DatumFactory.createInt4(2));
+    
+    assertNotSame(t1.hashCode(),t3.hashCode());
+	}
+
+  @Test
+  public void testPutTuple() {
+    Tuple t1 = new VTuple(5);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(2, DatumFactory.createInt4(3));
+
+    Tuple t2 = new VTuple(2);
+    t2.put(0, DatumFactory.createInt4(4));
+    t2.put(1, DatumFactory.createInt4(5));
+
+    t1.put(3, t2);
+
+    for (int i = 0; i < 5; i++) {
+      assertEquals(i+1, t1.get(i).asInt4());
+    }
+  }
+
+  @Test
+  public void testClone() throws CloneNotSupportedException {
+    Tuple t1 = new VTuple(5);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("str"));
+
+    VTuple t2 = (VTuple) t1.clone();
+    assertNotSame(t1, t2);
+    assertEquals(t1, t2);
+
+    assertSame(t1.get(4), t2.get(4));
+
+    t1.clear();
+    assertFalse(t1.equals(t2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
new file mode 100644
index 0000000..d1c561b
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<configuration>
+  <property>
+    <name>fs.s3.impl</name>
+    <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
+  </property>
+
+  <!-- Storage Manager Configuration -->
+  <property>
+    <name>tajo.storage.manager.hdfs.class</name>
+    <value>org.apache.tajo.storage.FileStorageManager</value>
+  </property>
+  <property>
+    <name>tajo.storage.manager.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+  </property>
+
+  <!--- Registered Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler</name>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+  </property>
+
+  <!--- Fragment Class Configurations -->
+  <property>
+    <name>tajo.storage.fragment.csv.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.raw.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.rcfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.row.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.trevni.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.parquet.class</name>
+    <value>org.apache.tajo.storage.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.sequencefile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.avro.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+
+  <!--- Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.trevni.class</name>
+    <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+  </property>
+
+  <!--- Appender Handler -->
+  <property>
+    <name>tajo.storage.appender-handler</name>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.trevni.class</name>
+    <value>org.apache.tajo.storage.trevni.TrevniAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroAppender</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml
new file mode 100644
index 0000000..e37149d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/pom.xml
@@ -0,0 +1,349 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright 2012 Database Lab., Korea Univ.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.9.1-SNAPSHOT</version>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tajo-storage-hbase</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo HBase Storage</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>repository.jboss.org</id>
+      <url>https://repository.jboss.org/nexus/content/repositories/releases/
+      </url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <tajo.test>TRUE</tajo.test>
+          </systemProperties>
+          <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-protobuf-generated-sources-directory</id>
+            <phase>initialize</phase>
+            <configuration>
+              <target>
+                <mkdir dir="target/generated-sources/proto" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2</version>
+        <executions>
+          <execution>
+            <id>generate-sources</id>
+            <phase>generate-sources</phase>
+            <configuration>
+              <executable>protoc</executable>
+              <arguments>
+                <argument>-Isrc/main/proto/</argument>
+                <argument>--proto_path=../../tajo-common/src/main/proto</argument>
+                <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
+                <argument>--java_out=target/generated-sources/proto</argument>
+                <argument>src/main/proto/StorageFragmentProtos.proto</argument>
+              </arguments>
+            </configuration>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.5</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>target/generated-sources/proto</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-catalog-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-plan</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>zookeeper</artifactId>
+          <groupId>org.apache.zookeeper</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>slf4j-api</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jersey-json</artifactId>
+          <groupId>com.sun.jersey</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-yarn-server-tests</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-app</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-yarn-api</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-hs</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
new file mode 100644
index 0000000..8615235
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.TUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract class for HBase appender.
+ */
+public abstract class AbstractHBaseAppender implements Appender {
+  protected Configuration conf;
+  protected Schema schema;
+  protected TableMeta meta;
+  protected QueryUnitAttemptId taskAttemptId;
+  protected Path stagingDir;
+  protected boolean inited = false;
+
+  protected ColumnMapping columnMapping;
+  protected TableStatistics stats;
+  protected boolean enabledStats;
+
+  protected int columnNum;
+
+  protected byte[][][] mappingColumnFamilies;
+  protected boolean[] isBinaryColumns;
+  protected boolean[] isRowKeyMappings;
+  protected boolean[] isColumnKeys;
+  protected boolean[] isColumnValues;
+  protected int[] rowKeyFieldIndexes;
+  protected int[] rowkeyColumnIndexes;
+  protected char rowKeyDelimiter;
+
+  // the following four variables are used for '<cfname>:key:' or '<cfname>:value:' mapping
+  protected int[] columnKeyValueDataIndexes;
+  protected byte[][] columnKeyDatas;
+  protected byte[][] columnValueDatas;
+  protected byte[][] columnKeyCfNames;
+
+  protected KeyValue[] keyValues;
+
+  public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+                       Schema schema, TableMeta meta, Path stagingDir) {
+    this.conf = conf;
+    this.schema = schema;
+    this.meta = meta;
+    this.stagingDir = stagingDir;
+    this.taskAttemptId = taskAttemptId;
+  }
+
+  @Override
+  public void init() throws IOException {
+    if (inited) {
+      throw new IllegalStateException("FileAppender is already initialized.");
+    }
+    inited = true;
+    if (enabledStats) {
+      stats = new TableStatistics(this.schema);
+    }
+    columnMapping = new ColumnMapping(schema, meta);
+
+    mappingColumnFamilies = columnMapping.getMappingColumns();
+
+    isRowKeyMappings = columnMapping.getIsRowKeyMappings();
+    List<Integer> rowkeyColumnIndexList = new ArrayList<Integer>();
+    for (int i = 0; i < isRowKeyMappings.length; i++) {
+      if (isRowKeyMappings[i]) {
+        rowkeyColumnIndexList.add(i);
+      }
+    }
+    rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList);
+
+    isBinaryColumns = columnMapping.getIsBinaryColumns();
+    isColumnKeys = columnMapping.getIsColumnKeys();
+    isColumnValues = columnMapping.getIsColumnValues();
+    rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
+    rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
+
+    this.columnNum = schema.size();
+
+    // In the case of '<cfname>:key:' or '<cfname>:value:' KeyValue object should be set with the qualifier and value
+    // which are mapped to the same column family.
+    columnKeyValueDataIndexes = new int[isColumnKeys.length];
+    int index = 0;
+    int numKeyValues = 0;
+    Map<String, Integer> cfNameIndexMap = new HashMap<String, Integer>();
+    for (int i = 0; i < isColumnKeys.length; i++) {
+      if (isRowKeyMappings[i]) {
+        continue;
+      }
+      if (isColumnKeys[i] || isColumnValues[i]) {
+        String cfName = new String(mappingColumnFamilies[i][0]);
+        if (!cfNameIndexMap.containsKey(cfName)) {
+          cfNameIndexMap.put(cfName, index);
+          columnKeyValueDataIndexes[i] = index;
+          index++;
+          numKeyValues++;
+        } else {
+          columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName);
+        }
+      } else {
+        numKeyValues++;
+      }
+    }
+    columnKeyCfNames = new byte[cfNameIndexMap.size()][];
+    for (Map.Entry<String, Integer> entry: cfNameIndexMap.entrySet()) {
+      columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes();
+    }
+    columnKeyDatas = new byte[cfNameIndexMap.size()][];
+    columnValueDatas = new byte[cfNameIndexMap.size()][];
+
+    keyValues = new KeyValue[numKeyValues];
+  }
+
+  private ByteArrayOutputStream bout = new ByteArrayOutputStream();
+
+  protected byte[] getRowKeyBytes(Tuple tuple) throws IOException {
+    Datum datum;
+    byte[] rowkey;
+    if (rowkeyColumnIndexes.length > 1) {
+      bout.reset();
+      for (int i = 0; i < rowkeyColumnIndexes.length; i++) {
+        datum = tuple.get(rowkeyColumnIndexes[i]);
+        if (isBinaryColumns[rowkeyColumnIndexes[i]]) {
+          rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+        } else {
+          rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+        }
+        bout.write(rowkey);
+        if (i < rowkeyColumnIndexes.length - 1) {
+          bout.write(rowKeyDelimiter);
+        }
+      }
+      rowkey = bout.toByteArray();
+    } else {
+      int index = rowkeyColumnIndexes[0];
+      datum = tuple.get(index);
+      if (isBinaryColumns[index]) {
+        rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum);
+      } else {
+        rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum);
+      }
+    }
+
+    return rowkey;
+  }
+
+  protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException {
+    int keyValIndex = 0;
+    for (int i = 0; i < columnNum; i++) {
+      if (isRowKeyMappings[i]) {
+        continue;
+      }
+      Datum datum = tuple.get(i);
+      byte[] value;
+      if (isBinaryColumns[i]) {
+        value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+      } else {
+        value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
+      }
+
+      if (isColumnKeys[i]) {
+        columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
+      } else if (isColumnValues[i]) {
+        columnValueDatas[columnKeyValueDataIndexes[i]] = value;
+      } else {
+        keyValues[keyValIndex] = new KeyValue(rowkey, mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value);
+        keyValIndex++;
+      }
+    }
+
+    for (int i = 0; i < columnKeyDatas.length; i++) {
+      keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]);
+    }
+  }
+
+  @Override
+  public void enableStats() {
+    enabledStats = true;
+  }
+
+  @Override
+  public TableStats getStats() {
+    if (enabledStats) {
+      return stats.getTableStat();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
new file mode 100644
index 0000000..79161cc
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.SortNode;
+import org.apache.tajo.plan.logical.SortNode.SortPurpose;
+import org.apache.tajo.plan.logical.UnaryNode;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
+
+public class AddSortForInsertRewriter implements RewriteRule {
+  private int[] sortColumnIndexes;
+  private Column[] sortColumns;
+  public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) {
+    this.sortColumns = sortColumns;
+    this.sortColumnIndexes = new int[sortColumns.length];
+
+    Schema tableSchema = tableDesc.getSchema();
+    for (int i = 0; i < sortColumns.length; i++) {
+      sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "AddSortForInsertRewriter";
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    StoreType storeType = PlannerUtil.getStoreType(plan);
+    return storeType != null;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+    UnaryNode insertNode = rootNode.getChild();
+    LogicalNode childNode = insertNode.getChild();
+
+    Schema sortSchema = childNode.getOutSchema();
+    SortNode sortNode = plan.createNode(SortNode.class);
+    sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
+    sortNode.setInSchema(sortSchema);
+    sortNode.setOutSchema(sortSchema);
+
+    SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
+    int index = 0;
+
+    for (int i = 0; i < sortColumnIndexes.length; i++) {
+      Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
+      if (sortColumn == null) {
+        throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
+      }
+      sortSpecs[index++] = new SortSpec(sortColumn, true, true);
+    }
+    sortNode.setSortSpecs(sortSpecs);
+
+    sortNode.setChild(insertNode.getChild());
+    insertNode.setChild(sortNode);
+    plan.getRootBlock().registerNode(sortNode);
+
+    return plan;
+  }
+}