You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:33 UTC

[19/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
new file mode 100644
index 0000000..c6149f7
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestLazyTuple.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.BytesUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestLazyTuple {
+
+  Schema schema;
+  byte[][] textRow;
+  byte[] nullbytes;
+  SerializerDeserializer serde;
+
+  @Before
+  public void setUp() {
+    nullbytes = "\\N".getBytes();
+
+    schema = new Schema();
+    schema.addColumn("col1", TajoDataTypes.Type.BOOLEAN);
+    schema.addColumn("col2", TajoDataTypes.Type.BIT);
+    schema.addColumn("col3", TajoDataTypes.Type.CHAR, 7);
+    schema.addColumn("col4", TajoDataTypes.Type.INT2);
+    schema.addColumn("col5", TajoDataTypes.Type.INT4);
+    schema.addColumn("col6", TajoDataTypes.Type.INT8);
+    schema.addColumn("col7", TajoDataTypes.Type.FLOAT4);
+    schema.addColumn("col8", TajoDataTypes.Type.FLOAT8);
+    schema.addColumn("col9", TajoDataTypes.Type.TEXT);
+    schema.addColumn("col10", TajoDataTypes.Type.BLOB);
+    schema.addColumn("col11", TajoDataTypes.Type.INET4);
+    schema.addColumn("col12", TajoDataTypes.Type.INT4);
+    schema.addColumn("col13", TajoDataTypes.Type.NULL_TYPE);
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(DatumFactory.createBool(true)).append('|');
+    sb.append(new String(DatumFactory.createBit((byte) 0x99).asTextBytes())).append('|');
+    sb.append(DatumFactory.createChar("str")).append('|');
+    sb.append(DatumFactory.createInt2((short) 17)).append('|');
+    sb.append(DatumFactory.createInt4(59)).append('|');
+    sb.append(DatumFactory.createInt8(23l)).append('|');
+    sb.append(DatumFactory.createFloat4(77.9f)).append('|');
+    sb.append(DatumFactory.createFloat8(271.9f)).append('|');
+    sb.append(DatumFactory.createText("str2")).append('|');
+    sb.append(DatumFactory.createBlob("jinho".getBytes())).append('|');
+    sb.append(DatumFactory.createInet4("192.168.0.1")).append('|');
+    sb.append(new String(nullbytes)).append('|');
+    sb.append(NullDatum.get());
+    textRow = BytesUtils.splitPreserveAllTokens(sb.toString().getBytes(), '|');
+    serde = new TextSerializerDeserializer();
+  }
+
+  @Test
+  public void testGetDatum() {
+
+    LazyTuple t1 = new LazyTuple(schema, textRow, -1, nullbytes, serde);
+    assertEquals(DatumFactory.createBool(true), t1.get(0));
+    assertEquals(DatumFactory.createBit((byte) 0x99), t1.get(1));
+    assertEquals(DatumFactory.createChar("str"), t1.get(2));
+    assertEquals(DatumFactory.createInt2((short) 17), t1.get(3));
+    assertEquals(DatumFactory.createInt4(59), t1.get(4));
+    assertEquals(DatumFactory.createInt8(23l), t1.get(5));
+    assertEquals(DatumFactory.createFloat4(77.9f), t1.get(6));
+    assertEquals(DatumFactory.createFloat8(271.9f), t1.get(7));
+    assertEquals(DatumFactory.createText("str2"), t1.get(8));
+    assertEquals(DatumFactory.createBlob("jinho".getBytes()), t1.get(9));
+    assertEquals(DatumFactory.createInet4("192.168.0.1"), t1.get(10));
+    assertEquals(NullDatum.get(), t1.get(11));
+    assertEquals(NullDatum.get(), t1.get(12));
+  }
+
+  @Test
+  public void testContain() {
+    int colNum = schema.size();
+
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(3, DatumFactory.createInt4(1));
+    t1.put(7, DatumFactory.createInt4(1));
+
+    assertTrue(t1.contains(0));
+    assertFalse(t1.contains(1));
+    assertFalse(t1.contains(2));
+    assertTrue(t1.contains(3));
+    assertFalse(t1.contains(4));
+    assertFalse(t1.contains(5));
+    assertFalse(t1.contains(6));
+    assertTrue(t1.contains(7));
+    assertFalse(t1.contains(8));
+    assertFalse(t1.contains(9));
+    assertFalse(t1.contains(10));
+    assertFalse(t1.contains(11));
+    assertFalse(t1.contains(12));
+  }
+
+  @Test
+  public void testPut() {
+    int colNum = schema.size();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    t1.put(0, DatumFactory.createText("str"));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(11, DatumFactory.createFloat4(0.76f));
+
+    assertTrue(t1.contains(0));
+    assertTrue(t1.contains(1));
+
+    assertEquals(t1.getText(0), "str");
+    assertEquals(t1.get(1).asInt4(), 2);
+    assertTrue(t1.get(11).asFloat4() == 0.76f);
+  }
+
+  @Test
+  public void testEquals() {
+    int colNum = schema.size();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+
+    assertEquals(t1, t2);
+
+    Tuple t3 = new VTuple(colNum);
+    t3.put(0, DatumFactory.createInt4(1));
+    t3.put(1, DatumFactory.createInt4(2));
+    t3.put(3, DatumFactory.createInt4(2));
+    assertEquals(t1, t3);
+    assertEquals(t2, t3);
+
+    LazyTuple t4 = new LazyTuple(schema, new byte[colNum][], -1);
+    assertNotSame(t1, t4);
+  }
+
+  @Test
+  public void testHashCode() {
+    int colNum = schema.size();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+    LazyTuple t2 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("str"));
+
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+    t2.put(4, DatumFactory.createText("str"));
+
+    assertEquals(t1.hashCode(), t2.hashCode());
+
+    Tuple t3 = new VTuple(colNum);
+    t3.put(0, DatumFactory.createInt4(1));
+    t3.put(1, DatumFactory.createInt4(2));
+    t3.put(3, DatumFactory.createInt4(2));
+    t3.put(4, DatumFactory.createText("str"));
+    assertEquals(t1.hashCode(), t3.hashCode());
+    assertEquals(t2.hashCode(), t3.hashCode());
+
+    Tuple t4 = new VTuple(5);
+    t4.put(0, DatumFactory.createInt4(1));
+    t4.put(1, DatumFactory.createInt4(2));
+    t4.put(4, DatumFactory.createInt4(2));
+
+    assertNotSame(t1.hashCode(), t4.hashCode());
+  }
+
+  @Test
+  public void testPutTuple() {
+    int colNum = schema.size();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(2, DatumFactory.createInt4(3));
+
+
+    Schema schema2 = new Schema();
+    schema2.addColumn("col1", TajoDataTypes.Type.INT8);
+    schema2.addColumn("col2", TajoDataTypes.Type.INT8);
+
+    LazyTuple t2 = new LazyTuple(schema2, new byte[schema2.size()][], -1);
+    t2.put(0, DatumFactory.createInt4(4));
+    t2.put(1, DatumFactory.createInt4(5));
+
+    t1.put(3, t2);
+
+    for (int i = 0; i < 5; i++) {
+      assertEquals(i + 1, t1.get(i).asInt4());
+    }
+  }
+
+  @Test
+  public void testInvalidNumber() {
+    byte[][] bytes = BytesUtils.splitPreserveAllTokens(" 1| |2 ||".getBytes(), '|');
+    Schema schema = new Schema();
+    schema.addColumn("col1", TajoDataTypes.Type.INT2);
+    schema.addColumn("col2", TajoDataTypes.Type.INT4);
+    schema.addColumn("col3", TajoDataTypes.Type.INT8);
+    schema.addColumn("col4", TajoDataTypes.Type.FLOAT4);
+    schema.addColumn("col5", TajoDataTypes.Type.FLOAT8);
+
+    LazyTuple tuple = new LazyTuple(schema, bytes, 0);
+    assertEquals(bytes.length, tuple.size());
+
+    for (int i = 0; i < tuple.size(); i++){
+      assertEquals(NullDatum.get(), tuple.get(i));
+    }
+  }
+
+  @Test
+  public void testClone() throws CloneNotSupportedException {
+    int colNum = schema.size();
+    LazyTuple t1 = new LazyTuple(schema, new byte[colNum][], -1);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("str"));
+
+    LazyTuple t2 = (LazyTuple) t1.clone();
+    assertNotSame(t1, t2);
+    assertEquals(t1, t2);
+
+    assertSame(t1.get(4), t2.get(4));
+
+    t1.clear();
+    assertFalse(t1.equals(t2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
new file mode 100644
index 0000000..639ca04
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestTupleComparator.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestTupleComparator {
+
+  @Before
+  public void setUp() throws Exception {
+  }
+
+  @After
+  public void tearDown() throws Exception {
+  }
+
+  @Test
+  public final void testCompare() {
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.INT4);
+    schema.addColumn("col2", Type.INT4);
+    schema.addColumn("col3", Type.INT4);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.TEXT);
+    
+    Tuple tuple1 = new VTuple(5);
+    Tuple tuple2 = new VTuple(5);
+
+    tuple1.put(
+        new Datum[] {
+        DatumFactory.createInt4(9),
+        DatumFactory.createInt4(3),
+        DatumFactory.createInt4(33),
+        DatumFactory.createInt4(4),
+        DatumFactory.createText("abc")});
+    tuple2.put(
+        new Datum[] {
+        DatumFactory.createInt4(1),
+        DatumFactory.createInt4(25),
+        DatumFactory.createInt4(109),
+        DatumFactory.createInt4(4),
+        DatumFactory.createText("abd")});
+
+    SortSpec sortKey1 = new SortSpec(schema.getColumn("col4"), true, false);
+    SortSpec sortKey2 = new SortSpec(schema.getColumn("col5"), true, false);
+
+    BaseTupleComparator tc = new BaseTupleComparator(schema,
+        new SortSpec[] {sortKey1, sortKey2});
+    assertEquals(-1, tc.compare(tuple1, tuple2));
+    assertEquals(1, tc.compare(tuple2, tuple1));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
new file mode 100644
index 0000000..1bbd9ec
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/storage/TestVTuple.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+
+import org.apache.tajo.datum.DatumFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestVTuple {
+
+	/**
+	 * @throws Exception
+	 */
+	@Before
+	public void setUp() throws Exception {
+		
+	}
+	
+	@Test
+	public void testContain() {
+		VTuple t1 = new VTuple(260);
+		t1.put(0, DatumFactory.createInt4(1));
+		t1.put(1, DatumFactory.createInt4(1));
+		t1.put(27, DatumFactory.createInt4(1));
+		t1.put(96, DatumFactory.createInt4(1));
+		t1.put(257, DatumFactory.createInt4(1));
+		
+		assertTrue(t1.contains(0));
+		assertTrue(t1.contains(1));
+		assertFalse(t1.contains(2));
+		assertFalse(t1.contains(3));
+		assertFalse(t1.contains(4));
+		assertTrue(t1.contains(27));
+		assertFalse(t1.contains(28));
+		assertFalse(t1.contains(95));
+		assertTrue(t1.contains(96));
+		assertFalse(t1.contains(97));
+		assertTrue(t1.contains(257));
+	}
+	
+	@Test
+	public void testPut() {
+		VTuple t1 = new VTuple(260);
+		t1.put(0, DatumFactory.createText("str"));
+		t1.put(1, DatumFactory.createInt4(2));
+		t1.put(257, DatumFactory.createFloat4(0.76f));
+		
+		assertTrue(t1.contains(0));
+		assertTrue(t1.contains(1));
+		
+		assertEquals(t1.getText(0),"str");
+		assertEquals(t1.get(1).asInt4(),2);
+		assertTrue(t1.get(257).asFloat4() == 0.76f);
+	}
+
+  @Test
+	public void testEquals() {
+	  Tuple t1 = new VTuple(5);
+	  Tuple t2 = new VTuple(5);
+	  
+	  t1.put(0, DatumFactory.createInt4(1));
+	  t1.put(1, DatumFactory.createInt4(2));
+	  t1.put(3, DatumFactory.createInt4(2));
+	  
+	  t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+    
+    assertEquals(t1,t2);
+    
+    Tuple t3 = new VTuple(5);
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(4, DatumFactory.createInt4(2));
+    
+    assertNotSame(t1,t3);
+	}
+	
+	@Test
+	public void testHashCode() {
+	  Tuple t1 = new VTuple(5);
+    Tuple t2 = new VTuple(5);
+    
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("hyunsik"));
+    
+    t2.put(0, DatumFactory.createInt4(1));
+    t2.put(1, DatumFactory.createInt4(2));
+    t2.put(3, DatumFactory.createInt4(2));
+    t2.put(4, DatumFactory.createText("hyunsik"));
+    
+    assertEquals(t1.hashCode(),t2.hashCode());
+    
+    Tuple t3 = new VTuple(5);
+    t3.put(0, DatumFactory.createInt4(1));
+    t3.put(1, DatumFactory.createInt4(2));
+    t3.put(4, DatumFactory.createInt4(2));
+    
+    assertNotSame(t1.hashCode(),t3.hashCode());
+	}
+
+  @Test
+  public void testPutTuple() {
+    Tuple t1 = new VTuple(5);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(2, DatumFactory.createInt4(3));
+
+    Tuple t2 = new VTuple(2);
+    t2.put(0, DatumFactory.createInt4(4));
+    t2.put(1, DatumFactory.createInt4(5));
+
+    t1.put(3, t2);
+
+    for (int i = 0; i < 5; i++) {
+      assertEquals(i+1, t1.get(i).asInt4());
+    }
+  }
+
+  @Test
+  public void testClone() throws CloneNotSupportedException {
+    Tuple t1 = new VTuple(5);
+
+    t1.put(0, DatumFactory.createInt4(1));
+    t1.put(1, DatumFactory.createInt4(2));
+    t1.put(3, DatumFactory.createInt4(2));
+    t1.put(4, DatumFactory.createText("str"));
+
+    VTuple t2 = (VTuple) t1.clone();
+    assertNotSame(t1, t2);
+    assertEquals(t1, t2);
+
+    assertSame(t1.get(4), t2.get(4));
+
+    t1.clear();
+    assertFalse(t1.equals(t2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
new file mode 100644
index 0000000..b332364
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/TestBaseTupleBuilder.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple;
+
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.tuple.offheap.*;
+import org.junit.Test;
+
+public class TestBaseTupleBuilder {
+
+  @Test
+  public void testBuild() {
+    BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema);
+
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(10248);
+    OffHeapRowBlockReader reader = rowBlock.getReader();
+
+    ZeroCopyTuple inputTuple = new ZeroCopyTuple();
+
+    HeapTuple heapTuple = null;
+    ZeroCopyTuple zcTuple = null;
+    int i = 0;
+    while(reader.next(inputTuple)) {
+      RowStoreUtil.convert(inputTuple, builder);
+
+      heapTuple = builder.buildToHeapTuple();
+      TestOffHeapRowBlock.validateTupleResult(i, heapTuple);
+
+      zcTuple = builder.buildToZeroCopyTuple();
+      TestOffHeapRowBlock.validateTupleResult(i, zcTuple);
+
+      i++;
+    }
+  }
+
+  @Test
+  public void testBuildWithNull() {
+    BaseTupleBuilder builder = new BaseTupleBuilder(TestOffHeapRowBlock.schema);
+
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlockWithNull(10248);
+    OffHeapRowBlockReader reader = rowBlock.getReader();
+
+    ZeroCopyTuple inputTuple = new ZeroCopyTuple();
+
+    HeapTuple heapTuple = null;
+    ZeroCopyTuple zcTuple = null;
+    int i = 0;
+    while(reader.next(inputTuple)) {
+      RowStoreUtil.convert(inputTuple, builder);
+
+      heapTuple = builder.buildToHeapTuple();
+      TestOffHeapRowBlock.validateNullity(i, heapTuple);
+
+      zcTuple = builder.buildToZeroCopyTuple();
+      TestOffHeapRowBlock.validateNullity(i, zcTuple);
+
+      i++;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
new file mode 100644
index 0000000..96f465a
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestHeapTuple.java
@@ -0,0 +1,45 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.catalog.SchemaUtil;
+import org.junit.Test;
+
+public class TestHeapTuple {
+
+  @Test
+  public void testHeapTuple() {
+    OffHeapRowBlock rowBlock = TestOffHeapRowBlock.createRowBlock(1024);
+
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    int i = 0;
+    while (reader.next(zcTuple)) {
+      byte [] bytes = new byte[zcTuple.nioBuffer().limit()];
+      zcTuple.nioBuffer().get(bytes);
+
+      HeapTuple heapTuple = new HeapTuple(bytes, SchemaUtil.toDataTypes(TestOffHeapRowBlock.schema));
+      TestOffHeapRowBlock.validateTupleResult(i, heapTuple);
+      i++;
+    }
+
+    rowBlock.release();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
new file mode 100644
index 0000000..c43ba38
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestOffHeapRowBlock.java
@@ -0,0 +1,577 @@
+/***
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.ProtobufDatum;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.BaseTupleComparator;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.ProtoUtil;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.tajo.common.TajoDataTypes.Type;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestOffHeapRowBlock {
+  private static final Log LOG = LogFactory.getLog(TestOffHeapRowBlock.class);
+  public static String UNICODE_FIELD_PREFIX = "abc_가나다_";
+  public static Schema schema;
+
+  static {
+    schema = new Schema();
+    schema.addColumn("col0", Type.BOOLEAN);
+    schema.addColumn("col1", Type.INT2);
+    schema.addColumn("col2", Type.INT4);
+    schema.addColumn("col3", Type.INT8);
+    schema.addColumn("col4", Type.FLOAT4);
+    schema.addColumn("col5", Type.FLOAT8);
+    schema.addColumn("col6", Type.TEXT);
+    schema.addColumn("col7", Type.TIMESTAMP);
+    schema.addColumn("col8", Type.DATE);
+    schema.addColumn("col9", Type.TIME);
+    schema.addColumn("col10", Type.INTERVAL);
+    schema.addColumn("col11", Type.INET4);
+    schema.addColumn("col12",
+        CatalogUtil.newDataType(TajoDataTypes.Type.PROTOBUF, PrimitiveProtos.StringProto.class.getName()));
+  }
+
+  private void explainRowBlockAllocation(OffHeapRowBlock rowBlock, long startTime, long endTime) {
+    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
+        + (endTime - startTime) + " msec");
+  }
+
+  @Test
+  public void testPutAndReadValidation() {
+    int rowNum = 1000;
+
+    long allocStart = System.currentTimeMillis();
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
+    long allocEnd = System.currentTimeMillis();
+    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+    ZeroCopyTuple tuple = new ZeroCopyTuple();
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      fillRow(i, rowBlock.getWriter());
+
+      reader.reset();
+      int j = 0;
+      while(reader.next(tuple)) {
+        validateTupleResult(j, tuple);
+
+        j++;
+      }
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
+
+    long readStart = System.currentTimeMillis();
+    tuple = new ZeroCopyTuple();
+    int j = 0;
+    reader.reset();
+    while(reader.next(tuple)) {
+      validateTupleResult(j, tuple);
+      j++;
+    }
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+    rowBlock.release();
+  }
+
+  @Test
+  public void testNullityValidation() {
+    int rowNum = 1000;
+
+    long allocStart = System.currentTimeMillis();
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 1024);
+    long allocEnd = System.currentTimeMillis();
+    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+    ZeroCopyTuple tuple = new ZeroCopyTuple();
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      fillRowBlockWithNull(i, rowBlock.getWriter());
+
+      reader.reset();
+      int j = 0;
+      while(reader.next(tuple)) {
+        validateNullity(j, tuple);
+
+        j++;
+      }
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing and nullity validating take " + (writeEnd - writeStart) +" msec");
+
+    long readStart = System.currentTimeMillis();
+    tuple = new ZeroCopyTuple();
+    int j = 0;
+    reader.reset();
+    while(reader.next(tuple)) {
+      validateNullity(j, tuple);
+
+      j++;
+    }
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+    rowBlock.release();
+  }
+
+  @Test
+  public void testEmptyRow() {
+    int rowNum = 1000;
+
+    long allocStart = System.currentTimeMillis();
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 10);
+    long allocEnd = System.currentTimeMillis();
+    explainRowBlockAllocation(rowBlock, allocStart, allocEnd);
+
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      rowBlock.getWriter().startRow();
+      // empty columns
+      rowBlock.getWriter().endRow();
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing tooks " + (writeEnd - writeStart) + " msec");
+
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+    long readStart = System.currentTimeMillis();
+    ZeroCopyTuple tuple = new ZeroCopyTuple();
+    int j = 0;
+    reader.reset();
+    while(reader.next(tuple)) {
+      j++;
+    }
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+    rowBlock.release();
+
+    assertEquals(rowNum, j);
+    assertEquals(rowNum, rowBlock.rows());
+  }
+
+  @Test
+  public void testSortBenchmark() {
+    int rowNum = 1000;
+
+    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+
+    List<ZeroCopyTuple> unSafeTuples = Lists.newArrayList();
+
+    long readStart = System.currentTimeMillis();
+    ZeroCopyTuple tuple = new ZeroCopyTuple();
+    reader.reset();
+    while(reader.next(tuple)) {
+      unSafeTuples.add(tuple);
+      tuple = new ZeroCopyTuple();
+    }
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+    SortSpec sortSpec = new SortSpec(new Column("col2", Type.INT4));
+    BaseTupleComparator comparator = new BaseTupleComparator(schema, new SortSpec[] {sortSpec});
+
+    long sortStart = System.currentTimeMillis();
+    Collections.sort(unSafeTuples, comparator);
+    long sortEnd = System.currentTimeMillis();
+    LOG.info("sorting took " + (sortEnd - sortStart) + " msec");
+    rowBlock.release();
+  }
+
+  @Test
+  public void testVTuplePutAndGetBenchmark() {
+    int rowNum = 1000;
+
+    List<VTuple> rowBlock = Lists.newArrayList();
+    long writeStart = System.currentTimeMillis();
+    VTuple tuple;
+    for (int i = 0; i < rowNum; i++) {
+      tuple = new VTuple(schema.size());
+      fillVTuple(i, tuple);
+      rowBlock.add(tuple);
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
+
+    long readStart = System.currentTimeMillis();
+    int j = 0;
+    for (VTuple t : rowBlock) {
+      validateTupleResult(j, t);
+      j++;
+    }
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+    int count = 0;
+    for (int l = 0; l < rowBlock.size(); l++) {
+      for(int m = 0; m < schema.size(); m++ ) {
+        if (rowBlock.get(l).contains(m) && rowBlock.get(l).get(m).type() == Type.INT4) {
+          count ++;
+        }
+      }
+    }
+    // For preventing unnecessary code elimination optimization.
+    LOG.info("The number of INT4 values is " + count + ".");
+  }
+
+  @Test
+  public void testVTuplePutAndGetBenchmarkViaDirectRowEncoder() {
+    int rowNum = 1000;
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 100);
+
+    long writeStart = System.currentTimeMillis();
+    VTuple tuple = new VTuple(schema.size());
+    for (int i = 0; i < rowNum; i++) {
+      fillVTuple(i, tuple);
+
+      RowStoreUtil.convert(tuple, rowBlock.getWriter());
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("Writing takes " + (writeEnd - writeStart) + " msec");
+
+    validateResults(rowBlock);
+    rowBlock.release();
+  }
+
+  @Test
+  public void testSerDerOfRowBlock() {
+    int rowNum = 1000;
+
+    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
+
+    ByteBuffer bb = rowBlock.nioBuffer();
+    OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb);
+    validateResults(restoredRowBlock);
+    rowBlock.release();
+  }
+
+  @Test
+  public void testSerDerOfZeroCopyTuple() {
+    int rowNum = 1000;
+
+    OffHeapRowBlock rowBlock = createRowBlock(rowNum);
+
+    ByteBuffer bb = rowBlock.nioBuffer();
+    OffHeapRowBlock restoredRowBlock = new OffHeapRowBlock(schema, bb);
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(restoredRowBlock);
+
+    long readStart = System.currentTimeMillis();
+    ZeroCopyTuple tuple = new ZeroCopyTuple();
+    ZeroCopyTuple copyTuple = new ZeroCopyTuple();
+    int j = 0;
+    reader.reset();
+    while(reader.next(tuple)) {
+      ByteBuffer copy = tuple.nioBuffer();
+      copyTuple.set(copy, SchemaUtil.toDataTypes(schema));
+
+      validateTupleResult(j, copyTuple);
+
+      j++;
+    }
+    long readEnd = System.currentTimeMillis();
+    LOG.info("reading takes " + (readEnd - readStart) + " msec");
+
+    rowBlock.release();
+  }
+
+  public static OffHeapRowBlock createRowBlock(int rowNum) {
+    long allocateStart = System.currentTimeMillis();
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8);
+    long allocatedEnd = System.currentTimeMillis();
+    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
+        + (allocatedEnd - allocateStart) + " msec");
+
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      fillRow(i, rowBlock.getWriter());
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing takes " + (writeEnd - writeStart) + " msec");
+
+    return rowBlock;
+  }
+
+  public static OffHeapRowBlock createRowBlockWithNull(int rowNum) {
+    long allocateStart = System.currentTimeMillis();
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, StorageUnit.MB * 8);
+    long allocatedEnd = System.currentTimeMillis();
+    LOG.info(FileUtil.humanReadableByteCount(rowBlock.size(), true) + " bytes allocated "
+        + (allocatedEnd - allocateStart) + " msec");
+
+    long writeStart = System.currentTimeMillis();
+    for (int i = 0; i < rowNum; i++) {
+      fillRowBlockWithNull(i, rowBlock.getWriter());
+    }
+    long writeEnd = System.currentTimeMillis();
+    LOG.info("writing and validating take " + (writeEnd - writeStart) + " msec");
+
+    return rowBlock;
+  }
+
+  public static void fillRow(int i, RowWriter builder) {
+    builder.startRow();
+    builder.putBool(i % 1 == 0 ? true : false); // 0
+    builder.putInt2((short) 1);                 // 1
+    builder.putInt4(i);                         // 2
+    builder.putInt8(i);                         // 3
+    builder.putFloat4(i);                       // 4
+    builder.putFloat8(i);                       // 5
+    builder.putText((UNICODE_FIELD_PREFIX + i).getBytes());  // 6
+    builder.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
+    builder.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
+    builder.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
+    builder.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
+    builder.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
+    builder.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
+    builder.endRow();
+  }
+
+  public static void fillRowBlockWithNull(int i, RowWriter writer) {
+    writer.startRow();
+
+    if (i == 0) {
+      writer.skipField();
+    } else {
+      writer.putBool(i % 1 == 0 ? true : false); // 0
+    }
+    if (i % 1 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInt2((short) 1);                 // 1
+    }
+
+    if (i % 2 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInt4(i);                         // 2
+    }
+
+    if (i % 3 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInt8(i);                         // 3
+    }
+
+    if (i % 4 == 0) {
+      writer.skipField();
+    } else {
+      writer.putFloat4(i);                       // 4
+    }
+
+    if (i % 5 == 0) {
+      writer.skipField();
+    } else {
+      writer.putFloat8(i);                       // 5
+    }
+
+    if (i % 6 == 0) {
+      writer.skipField();
+    } else {
+      writer.putText((UNICODE_FIELD_PREFIX + i).getBytes());  // 6
+    }
+
+    if (i % 7 == 0) {
+      writer.skipField();
+    } else {
+      writer.putTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i); // 7
+    }
+
+    if (i % 8 == 0) {
+      writer.skipField();
+    } else {
+      writer.putDate(DatumFactory.createDate("2014-04-16").asInt4() + i); // 8
+    }
+
+    if (i % 9 == 0) {
+      writer.skipField();
+    } else {
+      writer.putTime(DatumFactory.createTime("08:48:00").asInt8() + i); // 9
+    }
+
+    if (i % 10 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInterval(DatumFactory.createInterval((i + 1) + " hours")); // 10
+    }
+
+    if (i % 11 == 0) {
+      writer.skipField();
+    } else {
+      writer.putInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i); // 11
+    }
+
+    if (i % 12 == 0) {
+      writer.skipField();
+    } else {
+      writer.putProtoDatum(new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12
+    }
+
+    writer.endRow();
+  }
+
+  public static void fillVTuple(int i, VTuple tuple) {
+    tuple.put(0, DatumFactory.createBool(i % 1 == 0));
+    tuple.put(1, DatumFactory.createInt2((short) 1));
+    tuple.put(2, DatumFactory.createInt4(i));
+    tuple.put(3, DatumFactory.createInt8(i));
+    tuple.put(4, DatumFactory.createFloat4(i));
+    tuple.put(5, DatumFactory.createFloat8(i));
+    tuple.put(6, DatumFactory.createText((UNICODE_FIELD_PREFIX + i).getBytes()));
+    tuple.put(7, DatumFactory.createTimestamp(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + i)); // 7
+    tuple.put(8, DatumFactory.createDate(DatumFactory.createDate("2014-04-16").asInt4() + i)); // 8
+    tuple.put(9, DatumFactory.createTime(DatumFactory.createTime("08:48:00").asInt8() + i)); // 9
+    tuple.put(10, DatumFactory.createInterval((i + 1) + " hours")); // 10
+    tuple.put(11, DatumFactory.createInet4(DatumFactory.createInet4("192.168.0.1").asInt4() + i)); // 11
+    tuple.put(12, new ProtobufDatum(ProtoUtil.convertString(i + ""))); // 12;
+  }
+
+  public static void validateResults(OffHeapRowBlock rowBlock) {
+    long readStart = System.currentTimeMillis();
+    ZeroCopyTuple tuple = new ZeroCopyTuple();
+    int j = 0;
+    OffHeapRowBlockReader reader = new OffHeapRowBlockReader(rowBlock);
+    reader.reset();
+    while(reader.next(tuple)) {
+      validateTupleResult(j, tuple);
+      j++;
+    }
+    long readEnd = System.currentTimeMillis();
+    LOG.info("Reading takes " + (readEnd - readStart) + " msec");
+  }
+
+  public static void validateTupleResult(int j, Tuple t) {
+    assertTrue((j % 1 == 0) == t.getBool(0));
+    assertTrue(1 == t.getInt2(1));
+    assertEquals(j, t.getInt4(2));
+    assertEquals(j, t.getInt8(3));
+    assertTrue(j == t.getFloat4(4));
+    assertTrue(j == t.getFloat8(5));
+    assertEquals(new String(UNICODE_FIELD_PREFIX + j), t.getText(6));
+    assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, t.getInt8(7));
+    assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, t.getInt4(8));
+    assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, t.getInt8(9));
+    assertEquals(DatumFactory.createInterval((j + 1) + " hours"), t.getInterval(10));
+    assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, t.getInt4(11));
+    assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), t.getProtobufDatum(12));
+  }
+
+  public static void validateNullity(int j, Tuple tuple) {
+    if (j == 0) {
+      tuple.isNull(0);
+    } else {
+      assertTrue((j % 1 == 0) == tuple.getBool(0));
+    }
+
+    if (j % 1 == 0) {
+      tuple.isNull(1);
+    } else {
+      assertTrue(1 == tuple.getInt2(1));
+    }
+
+    if (j % 2 == 0) {
+      tuple.isNull(2);
+    } else {
+      assertEquals(j, tuple.getInt4(2));
+    }
+
+    if (j % 3 == 0) {
+      tuple.isNull(3);
+    } else {
+      assertEquals(j, tuple.getInt8(3));
+    }
+
+    if (j % 4 == 0) {
+      tuple.isNull(4);
+    } else {
+      assertTrue(j == tuple.getFloat4(4));
+    }
+
+    if (j % 5 == 0) {
+      tuple.isNull(5);
+    } else {
+      assertTrue(j == tuple.getFloat8(5));
+    }
+
+    if (j % 6 == 0) {
+      tuple.isNull(6);
+    } else {
+      assertEquals(new String(UNICODE_FIELD_PREFIX + j), tuple.getText(6));
+    }
+
+    if (j % 7 == 0) {
+      tuple.isNull(7);
+    } else {
+      assertEquals(DatumFactory.createTimestamp("2014-04-16 08:48:00").asInt8() + (long) j, tuple.getInt8(7));
+    }
+
+    if (j % 8 == 0) {
+      tuple.isNull(8);
+    } else {
+      assertEquals(DatumFactory.createDate("2014-04-16").asInt4() + j, tuple.getInt4(8));
+    }
+
+    if (j % 9 == 0) {
+      tuple.isNull(9);
+    } else {
+      assertEquals(DatumFactory.createTime("08:48:00").asInt8() + j, tuple.getInt8(9));
+    }
+
+    if (j % 10 == 0) {
+      tuple.isNull(10);
+    } else {
+      assertEquals(DatumFactory.createInterval((j + 1) + " hours"), tuple.getInterval(10));
+    }
+
+    if (j % 11 == 0) {
+      tuple.isNull(11);
+    } else {
+      assertEquals(DatumFactory.createInet4("192.168.0.1").asInt4() + j, tuple.getInt4(11));
+    }
+
+    if (j % 12 == 0) {
+      tuple.isNull(12);
+    } else {
+      assertEquals(new ProtobufDatum(ProtoUtil.convertString(j + "")), tuple.getProtobufDatum(12));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
new file mode 100644
index 0000000..1eb9c17
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/java/org/apache/tajo/tuple/offheap/TestResizableSpec.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.tuple.offheap;
+
+import org.apache.tajo.unit.StorageUnit;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class TestResizableSpec {
+
+  @Test
+  public void testResizableLimit() {
+    ResizableLimitSpec limit = new ResizableLimitSpec(10 * StorageUnit.MB, 1000 * StorageUnit.MB, 0.1f, 1.0f);
+
+    long expectedMaxSize = (long) (1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
+
+    assertTrue(limit.limit() == 1000 * StorageUnit.MB + (1000 * StorageUnit.MB * 0.1f));
+
+    assertEquals(20971520, limit.increasedSize(10 * StorageUnit.MB));
+
+    assertEquals(expectedMaxSize, limit.increasedSize(1600 * StorageUnit.MB));
+
+    assertEquals(0.98f, limit.remainRatio(980 * StorageUnit.MB), 0.1);
+
+    assertFalse(limit.canIncrease(limit.limit()));
+  }
+
+  @Test
+  public void testFixedLimit() {
+    FixedSizeLimitSpec limit = new FixedSizeLimitSpec(100 * StorageUnit.MB, 0.0f);
+
+    assertEquals(limit.limit(), 100 * StorageUnit.MB);
+
+    assertEquals(100 * StorageUnit.MB, limit.increasedSize(1000));
+
+    assertEquals(100 * StorageUnit.MB, limit.increasedSize(1600 * StorageUnit.MB));
+
+    assertTrue(0.98f == limit.remainRatio(98 * StorageUnit.MB));
+
+    assertFalse(limit.canIncrease(limit.limit()));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
new file mode 100644
index 0000000..d1c561b
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -0,0 +1,164 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<configuration>
+  <property>
+    <name>fs.s3.impl</name>
+    <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
+  </property>
+
+  <!-- Storage Manager Configuration -->
+  <property>
+    <name>tajo.storage.manager.hdfs.class</name>
+    <value>org.apache.tajo.storage.FileStorageManager</value>
+  </property>
+  <property>
+    <name>tajo.storage.manager.hbase.class</name>
+    <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+  </property>
+
+  <!--- Registered Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler</name>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+  </property>
+
+  <!--- Fragment Class Configurations -->
+  <property>
+    <name>tajo.storage.fragment.csv.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.raw.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.rcfile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.row.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.trevni.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.parquet.class</name>
+    <value>org.apache.tajo.storage.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.sequencefile.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+  <property>
+    <name>tajo.storage.fragment.avro.class</name>
+    <value>org.apache.tajo.storage.fragment.FileFragment</value>
+  </property>
+
+  <!--- Scanner Handler -->
+  <property>
+    <name>tajo.storage.scanner-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.trevni.class</name>
+    <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.scanner-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroScanner</value>
+  </property>
+
+  <!--- Appender Handler -->
+  <property>
+    <name>tajo.storage.appender-handler</name>
+    <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rcfile.class</name>
+    <value>org.apache.tajo.storage.rcfile.RCFile$RCFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.trevni.class</name>
+    <value>org.apache.tajo.storage.trevni.TrevniAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.parquet.class</name>
+    <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.sequencefile.class</name>
+    <value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.appender-handler.avro.class</name>
+    <value>org.apache.tajo.storage.avro.AvroAppender</value>
+  </property>
+</configuration>

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml
new file mode 100644
index 0000000..e37149d
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/pom.xml
@@ -0,0 +1,349 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Copyright 2012 Database Lab., Korea Univ.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.9.1-SNAPSHOT</version>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tajo-storage-hbase</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo HBase Storage</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>repository.jboss.org</id>
+      <url>https://repository.jboss.org/nexus/content/repositories/releases/
+      </url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.6</source>
+          <target>1.6</target>
+          <encoding>${project.build.sourceEncoding}</encoding>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <tajo.test>TRUE</tajo.test>
+          </systemProperties>
+          <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>2.4</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-protobuf-generated-sources-directory</id>
+            <phase>initialize</phase>
+            <configuration>
+              <target>
+                <mkdir dir="target/generated-sources/proto" />
+              </target>
+            </configuration>
+            <goals>
+              <goal>run</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2</version>
+        <executions>
+          <execution>
+            <id>generate-sources</id>
+            <phase>generate-sources</phase>
+            <configuration>
+              <executable>protoc</executable>
+              <arguments>
+                <argument>-Isrc/main/proto/</argument>
+                <argument>--proto_path=../../tajo-common/src/main/proto</argument>
+                <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument>
+                <argument>--java_out=target/generated-sources/proto</argument>
+                <argument>src/main/proto/StorageFragmentProtos.proto</argument>
+              </arguments>
+            </configuration>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>build-helper-maven-plugin</artifactId>
+        <version>1.5</version>
+        <executions>
+          <execution>
+            <id>add-source</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>add-source</goal>
+            </goals>
+            <configuration>
+              <sources>
+                <source>target/generated-sources/proto</source>
+              </sources>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-catalog-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-plan</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <artifactId>zookeeper</artifactId>
+          <groupId>org.apache.zookeeper</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>slf4j-api</artifactId>
+          <groupId>org.slf4j</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jersey-json</artifactId>
+          <groupId>com.sun.jersey</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>provided</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>commons-el</groupId>
+          <artifactId>commons-el</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>tomcat</groupId>
+          <artifactId>jasper-compiler</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jsp-2.1-jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey.jersey-test-framework</groupId>
+          <artifactId>jersey-test-framework-grizzly2</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-yarn-server-tests</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-app</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-yarn-api</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-hs</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-mapreduce-client-core</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase.version}</version>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+        <version>2.15</version>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
new file mode 100644
index 0000000..8615235
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.util.TUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An abstract class for HBase appender.
+ */
+public abstract class AbstractHBaseAppender implements Appender {
+  protected Configuration conf;
+  protected Schema schema;
+  protected TableMeta meta;
+  protected QueryUnitAttemptId taskAttemptId;
+  protected Path stagingDir;
+  protected boolean inited = false;
+
+  protected ColumnMapping columnMapping;
+  protected TableStatistics stats;
+  protected boolean enabledStats;
+
+  protected int columnNum;
+
+  protected byte[][][] mappingColumnFamilies;
+  protected boolean[] isBinaryColumns;
+  protected boolean[] isRowKeyMappings;
+  protected boolean[] isColumnKeys;
+  protected boolean[] isColumnValues;
+  protected int[] rowKeyFieldIndexes;
+  protected int[] rowkeyColumnIndexes;
+  protected char rowKeyDelimiter;
+
+  // the following four variables are used for '<cfname>:key:' or '<cfname>:value:' mapping
+  protected int[] columnKeyValueDataIndexes;
+  protected byte[][] columnKeyDatas;
+  protected byte[][] columnValueDatas;
+  protected byte[][] columnKeyCfNames;
+
+  protected KeyValue[] keyValues;
+
+  public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
+                       Schema schema, TableMeta meta, Path stagingDir) {
+    this.conf = conf;
+    this.schema = schema;
+    this.meta = meta;
+    this.stagingDir = stagingDir;
+    this.taskAttemptId = taskAttemptId;
+  }
+
+  @Override
+  public void init() throws IOException {
+    if (inited) {
+      throw new IllegalStateException("FileAppender is already initialized.");
+    }
+    inited = true;
+    if (enabledStats) {
+      stats = new TableStatistics(this.schema);
+    }
+    columnMapping = new ColumnMapping(schema, meta);
+
+    mappingColumnFamilies = columnMapping.getMappingColumns();
+
+    isRowKeyMappings = columnMapping.getIsRowKeyMappings();
+    List<Integer> rowkeyColumnIndexList = new ArrayList<Integer>();
+    for (int i = 0; i < isRowKeyMappings.length; i++) {
+      if (isRowKeyMappings[i]) {
+        rowkeyColumnIndexList.add(i);
+      }
+    }
+    rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList);
+
+    isBinaryColumns = columnMapping.getIsBinaryColumns();
+    isColumnKeys = columnMapping.getIsColumnKeys();
+    isColumnValues = columnMapping.getIsColumnValues();
+    rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
+    rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
+
+    this.columnNum = schema.size();
+
+    // In the case of '<cfname>:key:' or '<cfname>:value:' KeyValue object should be set with the qualifier and value
+    // which are mapped to the same column family.
+    columnKeyValueDataIndexes = new int[isColumnKeys.length];
+    int index = 0;
+    int numKeyValues = 0;
+    Map<String, Integer> cfNameIndexMap = new HashMap<String, Integer>();
+    for (int i = 0; i < isColumnKeys.length; i++) {
+      if (isRowKeyMappings[i]) {
+        continue;
+      }
+      if (isColumnKeys[i] || isColumnValues[i]) {
+        String cfName = new String(mappingColumnFamilies[i][0]);
+        if (!cfNameIndexMap.containsKey(cfName)) {
+          cfNameIndexMap.put(cfName, index);
+          columnKeyValueDataIndexes[i] = index;
+          index++;
+          numKeyValues++;
+        } else {
+          columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName);
+        }
+      } else {
+        numKeyValues++;
+      }
+    }
+    columnKeyCfNames = new byte[cfNameIndexMap.size()][];
+    for (Map.Entry<String, Integer> entry: cfNameIndexMap.entrySet()) {
+      columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes();
+    }
+    columnKeyDatas = new byte[cfNameIndexMap.size()][];
+    columnValueDatas = new byte[cfNameIndexMap.size()][];
+
+    keyValues = new KeyValue[numKeyValues];
+  }
+
+  private ByteArrayOutputStream bout = new ByteArrayOutputStream();
+
+  protected byte[] getRowKeyBytes(Tuple tuple) throws IOException {
+    Datum datum;
+    byte[] rowkey;
+    if (rowkeyColumnIndexes.length > 1) {
+      bout.reset();
+      for (int i = 0; i < rowkeyColumnIndexes.length; i++) {
+        datum = tuple.get(rowkeyColumnIndexes[i]);
+        if (isBinaryColumns[rowkeyColumnIndexes[i]]) {
+          rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+        } else {
+          rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
+        }
+        bout.write(rowkey);
+        if (i < rowkeyColumnIndexes.length - 1) {
+          bout.write(rowKeyDelimiter);
+        }
+      }
+      rowkey = bout.toByteArray();
+    } else {
+      int index = rowkeyColumnIndexes[0];
+      datum = tuple.get(index);
+      if (isBinaryColumns[index]) {
+        rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum);
+      } else {
+        rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum);
+      }
+    }
+
+    return rowkey;
+  }
+
+  protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException {
+    int keyValIndex = 0;
+    for (int i = 0; i < columnNum; i++) {
+      if (isRowKeyMappings[i]) {
+        continue;
+      }
+      Datum datum = tuple.get(i);
+      byte[] value;
+      if (isBinaryColumns[i]) {
+        value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
+      } else {
+        value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
+      }
+
+      if (isColumnKeys[i]) {
+        columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
+      } else if (isColumnValues[i]) {
+        columnValueDatas[columnKeyValueDataIndexes[i]] = value;
+      } else {
+        keyValues[keyValIndex] = new KeyValue(rowkey, mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value);
+        keyValIndex++;
+      }
+    }
+
+    for (int i = 0; i < columnKeyDatas.length; i++) {
+      keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]);
+    }
+  }
+
+  @Override
+  public void enableStats() {
+    enabledStats = true;
+  }
+
+  @Override
+  public TableStats getStats() {
+    if (enabledStats) {
+      return stats.getTableStat();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
new file mode 100644
index 0000000..79161cc
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.logical.SortNode;
+import org.apache.tajo.plan.logical.SortNode.SortPurpose;
+import org.apache.tajo.plan.logical.UnaryNode;
+import org.apache.tajo.plan.rewrite.RewriteRule;
+import org.apache.tajo.plan.util.PlannerUtil;
+
+public class AddSortForInsertRewriter implements RewriteRule {
+  private int[] sortColumnIndexes;
+  private Column[] sortColumns;
+  public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) {
+    this.sortColumns = sortColumns;
+    this.sortColumnIndexes = new int[sortColumns.length];
+
+    Schema tableSchema = tableDesc.getSchema();
+    for (int i = 0; i < sortColumns.length; i++) {
+      sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
+    }
+  }
+
+  @Override
+  public String getName() {
+    return "AddSortForInsertRewriter";
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    StoreType storeType = PlannerUtil.getStoreType(plan);
+    return storeType != null;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+    UnaryNode insertNode = rootNode.getChild();
+    LogicalNode childNode = insertNode.getChild();
+
+    Schema sortSchema = childNode.getOutSchema();
+    SortNode sortNode = plan.createNode(SortNode.class);
+    sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
+    sortNode.setInSchema(sortSchema);
+    sortNode.setOutSchema(sortSchema);
+
+    SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
+    int index = 0;
+
+    for (int i = 0; i < sortColumnIndexes.length; i++) {
+      Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
+      if (sortColumn == null) {
+        throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
+      }
+      sortSpecs[index++] = new SortSpec(sortColumn, true, true);
+    }
+    sortNode.setSortSpecs(sortSpecs);
+
+    sortNode.setChild(insertNode.getChild());
+    insertNode.setChild(sortNode);
+    plan.getRootBlock().registerNode(sortNode);
+
+    return plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
new file mode 100644
index 0000000..7ddf09a
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.util.BytesUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ColumnMapping {
+  private TableMeta tableMeta;
+  private Schema schema;
+  private char rowKeyDelimiter;
+
+  private String hbaseTableName;
+
+  private int[] rowKeyFieldIndexes;
+  private boolean[] isRowKeyMappings;
+  private boolean[] isBinaryColumns;
+  private boolean[] isColumnKeys;
+  private boolean[] isColumnValues;
+
+  // schema order -> 0: cf name, 1: column name -> name bytes
+  private byte[][][] mappingColumns;
+
+  private int numRowKeys;
+
+  public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException {
+    this.schema = schema;
+    this.tableMeta = tableMeta;
+
+    init();
+  }
+
+  public void init() throws IOException {
+    hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY);
+    String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim();
+    if (delim.length() > 0) {
+      rowKeyDelimiter = delim.charAt(0);
+    }
+    isRowKeyMappings = new boolean[schema.size()];
+    rowKeyFieldIndexes = new int[schema.size()];
+    isBinaryColumns = new boolean[schema.size()];
+    isColumnKeys = new boolean[schema.size()];
+    isColumnValues = new boolean[schema.size()];
+
+    mappingColumns = new byte[schema.size()][][];
+
+    for (int i = 0; i < schema.size(); i++) {
+      rowKeyFieldIndexes[i] = -1;
+    }
+
+    String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, "");
+    if (columnMapping == null || columnMapping.isEmpty()) {
+      throw new IOException("'columns' property is required.");
+    }
+
+    String[] columnMappingTokens = columnMapping.split(",");
+
+    if (columnMappingTokens.length != schema.getColumns().size()) {
+      throw new IOException("The number of mapped HBase columns is great than the number of Tajo table columns");
+    }
+
+    int index = 0;
+    for (String eachToken: columnMappingTokens) {
+      mappingColumns[index] = new byte[2][];
+
+      byte[][] mappingTokens = BytesUtils.splitPreserveAllTokens(eachToken.trim().getBytes(), ':');
+
+      if (mappingTokens.length == 3) {
+        if (mappingTokens[0].length == 0) {
+          // cfname
+          throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " +
+              "or '<cfname>:value:' or '<cfname>:value:#b'");
+        }
+        //<cfname>:key: or <cfname>:value:
+        if (mappingTokens[2].length != 0) {
+          String binaryOption = new String(mappingTokens[2]);
+          if ("#b".equals(binaryOption)) {
+            isBinaryColumns[index] = true;
+          } else {
+            throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " +
+                "or '<cfname>:value:' or '<cfname>:value:#b'");
+          }
+        }
+        mappingColumns[index][0] = mappingTokens[0];
+        String keyOrValue = new String(mappingTokens[1]);
+        if (HBaseStorageConstants.KEY_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) {
+          isColumnKeys[index] = true;
+        } else if (HBaseStorageConstants.VALUE_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) {
+          isColumnValues[index] = true;
+        } else {
+          throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'");
+        }
+      } else if (mappingTokens.length == 2) {
+        //<cfname>: or <cfname>:<qualifier> or :key
+        String cfName = new String(mappingTokens[0]);
+        String columnName = new String(mappingTokens[1]);
+        RowKeyMapping rowKeyMapping = getRowKeyMapping(cfName, columnName);
+        if (rowKeyMapping != null) {
+          isRowKeyMappings[index] = true;
+          numRowKeys++;
+          isBinaryColumns[index] = rowKeyMapping.isBinary();
+          if (!cfName.isEmpty()) {
+            if (rowKeyDelimiter == 0) {
+              throw new IOException("hbase.rowkey.delimiter is required.");
+            }
+            rowKeyFieldIndexes[index] = Integer.parseInt(cfName);
+          } else {
+            rowKeyFieldIndexes[index] = -1; //rowkey is mapped a single column.
+          }
+        } else {
+          if (cfName.isEmpty()) {
+            throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'");
+          }
+          if (cfName != null) {
+            mappingColumns[index][0] = Bytes.toBytes(cfName);
+          }
+
+          if (columnName != null && !columnName.isEmpty()) {
+            String[] columnNameTokens = columnName.split("#");
+            if (columnNameTokens[0].isEmpty()) {
+              mappingColumns[index][1] = null;
+            } else {
+              mappingColumns[index][1] = Bytes.toBytes(columnNameTokens[0]);
+            }
+            if (columnNameTokens.length == 2 && "b".equals(columnNameTokens[1])) {
+              isBinaryColumns[index] = true;
+            }
+          }
+        }
+      } else {
+        throw new IOException(eachToken + " 'column' attribute '[cfname]:[qualfier]:'");
+      }
+
+      index++;
+    } // for loop
+  }
+
+  public List<String> getColumnFamilyNames() {
+    List<String> cfNames = new ArrayList<String>();
+
+    for (byte[][] eachCfName: mappingColumns) {
+      if (eachCfName != null && eachCfName.length > 0 && eachCfName[0] != null) {
+        String cfName = new String(eachCfName[0]);
+        if (!cfNames.contains(cfName)) {
+          cfNames.add(cfName);
+        }
+      }
+    }
+
+    return cfNames;
+  }
+
+  private RowKeyMapping getRowKeyMapping(String cfName, String columnName) {
+    if (columnName == null || columnName.isEmpty()) {
+      return null;
+    }
+
+    String[] tokens = columnName.split("#");
+    if (!tokens[0].equalsIgnoreCase(HBaseStorageConstants.KEY_COLUMN_MAPPING)) {
+      return null;
+    }
+
+    RowKeyMapping rowKeyMapping = new RowKeyMapping();
+
+    if (tokens.length == 2 && "b".equals(tokens[1])) {
+      rowKeyMapping.setBinary(true);
+    }
+
+    if (cfName != null && !cfName.isEmpty()) {
+      rowKeyMapping.setKeyFieldIndex(Integer.parseInt(cfName));
+    }
+    return rowKeyMapping;
+  }
+
+  public char getRowKeyDelimiter() {
+    return rowKeyDelimiter;
+  }
+
+  public int[] getRowKeyFieldIndexes() {
+    return rowKeyFieldIndexes;
+  }
+
+  public boolean[] getIsRowKeyMappings() {
+    return isRowKeyMappings;
+  }
+
+  public byte[][][] getMappingColumns() {
+    return mappingColumns;
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public boolean[] getIsBinaryColumns() {
+    return isBinaryColumns;
+  }
+
+  public String getHbaseTableName() {
+    return hbaseTableName;
+  }
+
+  public boolean[] getIsColumnKeys() {
+    return isColumnKeys;
+  }
+
+  public int getNumRowKeys() {
+    return numRowKeys;
+  }
+
+  public boolean[] getIsColumnValues() {
+    return isColumnValues;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
new file mode 100644
index 0000000..c05c5bb
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+
+public class HBaseBinarySerializerDeserializer {
+
+  public static Datum deserialize(Column col, byte[] bytes) throws IOException {
+    Datum datum;
+    switch (col.getDataType().getType()) {
+      case INT1:
+      case INT2:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt2(Bytes.toShort(bytes));
+        break;
+      case INT4:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt4(Bytes.toInt(bytes));
+        break;
+      case INT8:
+        if (bytes.length == 4) {
+          datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toInt(bytes));
+        } else {
+          datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toLong(bytes));
+        }
+        break;
+      case FLOAT4:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat4(Bytes.toFloat(bytes));
+        break;
+      case FLOAT8:
+        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat8(Bytes.toDouble(bytes));
+        break;
+      case TEXT:
+        datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes);
+        break;
+      default:
+        datum = NullDatum.get();
+        break;
+    }
+    return datum;
+  }
+
+  public static byte[] serialize(Column col, Datum datum) throws IOException {
+    if (datum == null || datum instanceof NullDatum) {
+      return null;
+    }
+
+    byte[] bytes;
+    switch (col.getDataType().getType()) {
+      case INT1:
+      case INT2:
+        bytes = Bytes.toBytes(datum.asInt2());
+        break;
+      case INT4:
+        bytes = Bytes.toBytes(datum.asInt4());
+        break;
+      case INT8:
+        bytes = Bytes.toBytes(datum.asInt8());
+        break;
+      case FLOAT4:
+        bytes = Bytes.toBytes(datum.asFloat4());
+        break;
+      case FLOAT8:
+        bytes = Bytes.toBytes(datum.asFloat8());
+        break;
+      case TEXT:
+        bytes = Bytes.toBytes(datum.asChars());
+        break;
+      default:
+        bytes = null;
+        break;
+    }
+
+    return bytes;
+  }
+}