You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:49 UTC

[35/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
new file mode 100644
index 0000000..520c9f6
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -0,0 +1,1469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.hbase.*;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.InetAddress;
+import java.sql.ResultSet;
+import java.text.DecimalFormat;
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+@Category(IntegrationTest.class)
+public class TestHBaseTable extends QueryTestCaseBase {
+  private static final Log LOG = LogFactory.getLog(TestHBaseTable.class);
+
+  @BeforeClass
+  public static void beforeClass() {
+    try {
+      testingCluster.getHBaseUtil().startHBaseCluster();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    try {
+      testingCluster.getHBaseUtil().stopHBaseCluster();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testVerifyCreateHBaseTableRequiredMeta() throws Exception {
+    try {
+      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " +
+          "USING hbase").close();
+
+      fail("hbase table must have 'table' meta");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("HBase mapped table") >= 0);
+    }
+
+    try {
+      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " +
+          "USING hbase " +
+          "WITH ('table'='hbase_table')").close();
+
+      fail("hbase table must have 'columns' meta");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("'columns' property is required") >= 0);
+    }
+
+    try {
+      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " +
+          "USING hbase " +
+          "WITH ('table'='hbase_table', 'columns'='col1:,col2:')").close();
+
+      fail("hbase table must have 'hbase.zookeeper.quorum' meta");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("HBase mapped table") >= 0);
+    }
+  }
+
+  @Test
+  public void testCreateHBaseTable() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:a,col3:,col2:b', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table1");
+
+    HTableDescriptor hTableDesc = testingCluster.getHBaseUtil().getTableDescriptor("hbase_table");
+    assertNotNull(hTableDesc);
+    assertEquals("hbase_table", hTableDesc.getNameAsString());
+
+    HColumnDescriptor[] hColumns = hTableDesc.getColumnFamilies();
+    // col1 is mapped to rowkey
+    assertEquals(2, hColumns.length);
+    assertEquals("col2", hColumns[0].getNameAsString());
+    assertEquals("col3", hColumns[1].getNameAsString());
+
+    executeString("DROP TABLE hbase_mapped_table1 PURGE").close();
+
+    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    try {
+      assertFalse(hAdmin.tableExists("hbase_table"));
+    } finally {
+      hAdmin.close();
+    }
+  }
+
+  @Test
+  public void testCreateNotExistsExternalHBaseTable() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    try {
+      executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " +
+          "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:a,col3:,col2:b', " +
+          "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+          "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+      fail("External table should be a existed table.");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("External table should be a existed table.") >= 0);
+    }
+  }
+
+  @Test
+  public void testCreateRowFieldWithNonText() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    try {
+      executeString("CREATE TABLE hbase_mapped_table2 (rk1 int4, rk2 text, col3 text, col4 text) " +
+          "USING hbase WITH ('table'='hbase_table', 'columns'='0:key#b,1:key,col3:,col2:b', " +
+          "'hbase.rowkey.delimiter'='_', " +
+          "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+          "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+      fail("Key field type should be TEXT type");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("Key field type should be TEXT type") >= 0);
+    }
+  }
+
+  @Test
+  public void testCreateExternalHBaseTable() throws Exception {
+    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table_not_purge"));
+    hTableDesc.addFamily(new HColumnDescriptor("col1"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table_not_purge', 'columns'=':key,col1:a,col2:,col3:b', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    executeString("DROP TABLE external_hbase_mapped_table").close();
+
+    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    try {
+      assertTrue(hAdmin.tableExists("external_hbase_table_not_purge"));
+      hAdmin.disableTable("external_hbase_table_not_purge");
+      hAdmin.deleteTable("external_hbase_table_not_purge");
+    } finally {
+      hAdmin.close();
+    }
+  }
+
+  @Test
+  public void testSimpleSelectQuery() throws Exception {
+    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col1"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
+        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(i).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testBinaryMappedQuery() throws Exception {
+    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col1"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk int8, col1 text, col2 text, col3 int4)\n " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key#b,col1:a,col2:,col3:b#b', \n" +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "', \n" +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
+        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(Bytes.toBytes((long) i));
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes(i));
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table where rk > 20");
+      assertResultSet(res);
+      res.close();
+
+      //Projection
+      res = executeString("select col3, col2, rk from external_hbase_mapped_table where rk > 95");
+
+      String expected = "col3,col2,rk\n" +
+          "-------------------------------\n" +
+          "96,{\"k1\":\"k1-96\", \"k2\":\"k2-96\"},96\n" +
+          "97,{\"k1\":\"k1-97\", \"k2\":\"k2-97\"},97\n" +
+          "98,{\"k1\":\"k1-98\", \"k2\":\"k2-98\"},98\n" +
+          "99,{\"k1\":\"k1-99\", \"k2\":\"k2-99\"},99\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testColumnKeyValueSelectQuery() throws Exception {
+    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, col2_key text, col2_value text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
+        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 10; i++) {
+        Put put = new Put(Bytes.toBytes("rk-" + i));
+        for (int j = 0; j < 5; j++) {
+          put.add("col2".getBytes(), ("key-" + j).getBytes(), Bytes.toBytes("value-" + j));
+        }
+        put.add("col3".getBytes(), "".getBytes(), ("col3-value-" + i).getBytes());
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 >= 'rk-0'");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testRowFieldSelectQuery() throws Exception {
+    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, rk2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'='0:key,1:key,col3:a', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
+        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(("field1-" + i + "_field2-" + i).getBytes());
+        put.add("col3".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 > 'field1-20'");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testIndexPredication() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    hAdmin.tableExists("hbase_table");
+
+    HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+    try {
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        htable.put(put);
+      }
+      assertIndexPredication(false);
+
+      ResultSet res = executeString("select * from hbase_mapped_table where rk >= '020' and rk <= '055'");
+      assertResultSet(res);
+      res.close();
+
+      res = executeString("select * from hbase_mapped_table where rk = '021'");
+      String expected = "rk,col1,col2,col3\n" +
+          "-------------------------------\n" +
+          "021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      htable.close();
+      hAdmin.close();
+    }
+  }
+
+  @Test
+  public void testCompositeRowIndexPredication() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, rk2 text, col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    hAdmin.tableExists("hbase_table");
+
+    HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+    try {
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put((df.format(i) + "_" + df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        htable.put(put);
+      }
+
+      Scan scan = new Scan();
+      scan.setStartRow("021".getBytes());
+      scan.setStopRow(("021_" + new String(new char[]{Character.MAX_VALUE})).getBytes());
+      Filter filter = new InclusiveStopFilter(scan.getStopRow());
+      scan.setFilter(filter);
+
+      ResultScanner scanner = htable.getScanner(scan);
+      Result result = scanner.next();
+      assertNotNull(result);
+      assertEquals("021_021", new String(result.getRow()));
+      scanner.close();
+
+      assertIndexPredication(true);
+
+      ResultSet res = executeString("select * from hbase_mapped_table where rk = '021'");
+      String expected = "rk,rk2,col1,col2,col3\n" +
+          "-------------------------------\n" +
+          "021,021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      htable.close();
+      hAdmin.close();
+    }
+  }
+
+  private void assertIndexPredication(boolean isCompositeRowKey) throws Exception {
+    String postFix = isCompositeRowKey ? "_" + new String(new char[]{Character.MAX_VALUE}) : "";
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    ScanNode scanNode = new ScanNode(1);
+
+    // where rk = '021'
+    EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("021")));
+    scanNode.setQual(evalNodeEq);
+    StorageManager storageManager = StorageManager.getStorageManager(conf, StoreType.HBASE);
+    List<Fragment> fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    assertEquals(1, fragments.size());
+    assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
+    assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
+
+    // where rk >= '020' and rk <= '055'
+    EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("020")));
+    EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("055")));
+    EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
+    scanNode.setQual(evalNodeA);
+
+    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    assertEquals(2, fragments.size());
+    HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    HBaseFragment fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+
+    // where (rk >= '020' and rk <= '055') or rk = '075'
+    EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("075")));
+    EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
+    scanNode.setQual(evalNodeB);
+    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    assertEquals(3, fragments.size());
+    fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+
+    HBaseFragment fragment3 = (HBaseFragment) fragments.get(2);
+    assertEquals("075", new String(fragment3.getStartRow()));
+    assertEquals("075" + postFix, new String(fragment3.getStopRow()));
+
+
+    // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
+    EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("072")));
+    EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("078")));
+    EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+    EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
+    scanNode.setQual(evalNodeD);
+    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    assertEquals(3, fragments.size());
+
+    fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+
+    fragment3 = (HBaseFragment) fragments.get(2);
+    assertEquals("072", new String(fragment3.getStartRow()));
+    assertEquals("078" + postFix, new String(fragment3.getStopRow()));
+
+    // where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059')
+    evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("057")));
+    evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("059")));
+    evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+    evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
+    scanNode.setQual(evalNodeD);
+    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    assertEquals(2, fragments.size());
+
+    fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("059" + postFix, new String(fragment2.getStopRow()));
+  }
+
+  @Test
+  public void testNonForwardQuery() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:#b', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    HTable htable = null;
+    try {
+      hAdmin.tableExists("hbase_table");
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "".getBytes(), Bytes.toBytes(i));
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from hbase_mapped_table");
+      assertResultSet(res);
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      hAdmin.close();
+      if (htable == null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testJoin() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int8) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    HTable htable = null;
+    try {
+      hAdmin.tableExists("hbase_table");
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes((long) i));
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select a.rk, a.col1, a.col2, a.col3, b.l_orderkey, b.l_linestatus " +
+          "from hbase_mapped_table a " +
+          "join default.lineitem b on a.col3 = b.l_orderkey");
+      assertResultSet(res);
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      hAdmin.close();
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertInto() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    executeString("insert into hbase_mapped_table " +
+        "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
+          new boolean[]{false, false, false, true}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegion() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    DecimalFormat df = new DecimalFormat("000");
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegion2() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(i + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegionWithSplitFile() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    String splitFilePath = currentDatasetPath + "/splits.data";
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys.file'='" + splitFilePath + "', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    DecimalFormat df = new DecimalFormat("000");
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegionMultiRowFields() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a', " +
+        "'hbase.split.rowkeys'='001,002,003,004,005,006,007,008,009', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id1", Type.TEXT);
+    schema.addColumn("id2", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    DecimalFormat df = new DecimalFormat("000");
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|" + (i + 100) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id1, id2, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, null, Bytes.toBytes("col1")},
+          new byte[][]{null, null, Bytes.toBytes("a")},
+          new boolean[]{false, false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoBinaryMultiRegion() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk int4, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key#b,col1:a', " +
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(i + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{true, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoColumnKeyValue() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col2_key text, col2_value text, col3 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("rk", Type.TEXT);
+    schema.addColumn("col2_key", Type.TEXT);
+    schema.addColumn("col2_value", Type.TEXT);
+    schema.addColumn("col3", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 20; i >= 0; i--) {
+      for (int j = 0; j < 3; j++) {
+        datas.add(i + "|ck-" + j + "|value-" + j + "|col3-" + i);
+      }
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select rk, col2_key, col2_value, col3 from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, null, null},
+          new boolean[]{false, false, false}, tableDesc.getSchema()));
+
+      ResultSet res = executeString("select * from hbase_mapped_table");
+
+      String expected = "rk,col2_key,col2_value,col3\n" +
+          "-------------------------------\n" +
+          "0,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-0\n" +
+          "1,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-1\n" +
+          "10,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-10\n" +
+          "11,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-11\n" +
+          "12,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-12\n" +
+          "13,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-13\n" +
+          "14,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-14\n" +
+          "15,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-15\n" +
+          "16,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-16\n" +
+          "17,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-17\n" +
+          "18,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-18\n" +
+          "19,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-19\n" +
+          "2,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-2\n" +
+          "20,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-20\n" +
+          "3,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-3\n" +
+          "4,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-4\n" +
+          "5,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-5\n" +
+          "6,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-6\n" +
+          "7,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-7\n" +
+          "8,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-8\n" +
+          "9,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-9\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoDifferentType() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(i + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    try {
+      executeString("insert into hbase_mapped_table " +
+          "select id, name from base_table ").close();
+      fail("If inserting data type different with target table data type, should throw exception");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("is different column type with") >= 0);
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+    }
+  }
+
+  @Test
+  public void testInsertIntoRowField() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
+        "'hbase.rowkey.delimiter'='_', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    executeString("insert into hbase_mapped_table " +
+        "select l_orderkey::text, l_partkey::text, l_shipdate, l_returnflag, l_suppkey::text from default.lineitem ");
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), Bytes.toBytes(""), Bytes.toBytes("b")},
+          new boolean[]{false, false, false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testCATS() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    DecimalFormat df = new DecimalFormat("000");
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')" +
+        " as " +
+        "select id, name from base_table"
+    ).close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoUsingPut() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    Map<String, String> sessions = new HashMap<String, String>();
+    sessions.put(HBaseStorageConstants.INSERT_PUT_MODE, "true");
+    client.updateSessionVariables(sessions);
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      executeString("insert into hbase_mapped_table " +
+          "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close();
+
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      // result is dirrerent with testInsertInto because l_orderkey is not unique.
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
+          new boolean[]{false, false, false, true}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      client.unsetSessionVariables(TUtil.newList(HBaseStorageConstants.INSERT_PUT_MODE));
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoLocation() throws Exception {
+    String hostName = InetAddress.getLocalHost().getHostName();
+    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+    assertNotNull(zkPort);
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text) " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
+        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+
+    try {
+      // create test table
+      KeyValueSet tableOptions = new KeyValueSet();
+      tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+      tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
+
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.TEXT);
+      schema.addColumn("name", Type.TEXT);
+      schema.addColumn("comment", Type.TEXT);
+      List<String> datas = new ArrayList<String>();
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 99; i >= 0; i--) {
+        datas.add(df.format(i) + "|value" + i + "|comment-" + i);
+      }
+      TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+          schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+      executeString("insert into location '/tmp/hfile_test' " +
+          "select id, name, comment from base_table ").close();
+
+      FileSystem fs = testingCluster.getDefaultFileSystem();
+      Path path = new Path("/tmp/hfile_test");
+      assertTrue(fs.exists(path));
+
+      FileStatus[] files = fs.listStatus(path);
+      assertNotNull(files);
+      assertEquals(2, files.length);
+
+      int index = 0;
+      for (FileStatus eachFile: files) {
+        assertEquals("/tmp/hfile_test/part-01-00000" + index + "-00" + index, eachFile.getPath().toUri().getPath());
+        for (FileStatus subFile: fs.listStatus(eachFile.getPath())) {
+          assertTrue(subFile.isFile());
+          assertTrue(subFile.getLen() > 0);
+        }
+        index++;
+      }
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+    }
+  }
+
+  private String resultSetToString(ResultScanner scanner,
+                                   byte[][] cfNames, byte[][] qualifiers,
+                                   boolean[] binaries,
+                                   Schema schema) throws Exception {
+    StringBuilder sb = new StringBuilder();
+    Result result = null;
+    while ( (result = scanner.next()) != null ) {
+      if (binaries[0]) {
+        sb.append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(0), result.getRow()).asChar());
+      } else {
+        sb.append(new String(result.getRow()));
+      }
+
+      for (int i = 0; i < cfNames.length; i++) {
+        if (cfNames[i] == null) {
+          //rowkey
+          continue;
+        }
+        if (qualifiers[i] == null) {
+          Map<byte[], byte[]> values = result.getFamilyMap(cfNames[i]);
+          if (values == null) {
+            sb.append(", null");
+          } else {
+            sb.append(", {");
+            String delim = "";
+            for (Map.Entry<byte[], byte[]> valueEntry: values.entrySet()) {
+              byte[] keyBytes = valueEntry.getKey();
+              byte[] valueBytes = valueEntry.getValue();
+
+              if (binaries[i]) {
+                sb.append(delim).append("\"").append(keyBytes == null ? "" : Bytes.toLong(keyBytes)).append("\"");
+                sb.append(": \"").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\"");
+              } else {
+                sb.append(delim).append("\"").append(keyBytes == null ? "" : new String(keyBytes)).append("\"");
+                sb.append(": \"").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\"");
+              }
+              delim = ", ";
+            }
+            sb.append("}");
+          }
+        } else {
+          byte[] value = result.getValue(cfNames[i], qualifiers[i]);
+          if (value == null) {
+            sb.append(", null");
+          } else {
+            if (binaries[i]) {
+              sb.append(", ").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), value));
+            } else {
+              sb.append(", ").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), value));
+            }
+          }
+        }
+      }
+      sb.append("\n");
+    }
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
index c1cec2b..68b3fb3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java
@@ -565,8 +565,8 @@ public class TestJoinBroadcast extends QueryTestCaseBase {
         }
         Path dataPath = new Path(table.getPath().toString(), fileIndex + ".csv");
         fileIndex++;
-        appender = StorageManager.getStorageManager(conf).getAppender(tableMeta, schema,
-            dataPath);
+        appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+            .getAppender(tableMeta, schema, dataPath);
         appender.init();
       }
       String[] columnDatas = rows[i].split("\\|");

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index 543c17a..632724b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -56,7 +56,7 @@ public class TestResultSet {
   private static TajoTestingCluster util;
   private static TajoConf conf;
   private static TableDesc desc;
-  private static StorageManager sm;
+  private static FileStorageManager sm;
   private static TableMeta scoreMeta;
   private static Schema scoreSchema;
 
@@ -64,7 +64,7 @@ public class TestResultSet {
   public static void setup() throws Exception {
     util = TpchTestBase.getInstance().getTestingCluster();
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf);
+    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
 
     scoreSchema = new Schema();
     scoreSchema.addColumn("deptname", Type.TEXT);
@@ -74,8 +74,7 @@ public class TestResultSet {
 
     Path p = sm.getTablePath("score");
     sm.getFileSystem().mkdirs(p);
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(scoreMeta, scoreSchema,
-        new Path(p, "score"));
+    Appender appender = sm.getAppender(scoreMeta, scoreSchema, new Path(p, "score"));
     appender.init();
     int deptSize = 100;
     int tupleNum = 10000;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 067c6c8..712243b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -82,7 +82,7 @@ public class TestExecutionBlockCursor {
     logicalPlanner = new LogicalPlanner(catalog);
     optimizer = new LogicalOptimizer(conf);
 
-    StorageManager sm  = StorageManager.getStorageManager(conf);
+    StorageManager sm  = StorageManager.getFileStorageManager(conf);
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/storage/TestFileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestFileFragment.java
index e6ff7a3..d0ab1c0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/storage/TestFileFragment.java
+++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestFileFragment.java
@@ -47,7 +47,7 @@ public class TestFileFragment {
     assertEquals("table1_1", fragment1.getTableName());
     assertEquals(new Path(path, "table0"), fragment1.getPath());
     assertTrue(0 == fragment1.getStartKey());
-    assertTrue(500 == fragment1.getEndKey());
+    assertTrue(500 == fragment1.getLength());
   }
 
   @Test
@@ -58,7 +58,7 @@ public class TestFileFragment {
     assertEquals("table1_1", fragment1.getTableName());
     assertEquals(new Path(path, "table0"), fragment1.getPath());
     assertTrue(0 == fragment1.getStartKey());
-    assertTrue(500 == fragment1.getEndKey());
+    assertTrue(500 == fragment1.getLength());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 742b07f..f36ff24 100644
--- a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -69,7 +69,8 @@ public class TestRowFile {
 
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.ROWFILE);
 
-    StorageManager sm = StorageManager.getStorageManager(conf, new Path(conf.getVar(ConfVars.ROOT_DIR)));
+    FileStorageManager sm =
+        (FileStorageManager)StorageManager.getFileStorageManager(conf, new Path(conf.getVar(ConfVars.ROOT_DIR)));
 
     Path tablePath = new Path("/test");
     Path metaPath = new Path(tablePath, ".meta");
@@ -79,7 +80,7 @@ public class TestRowFile {
 
     FileUtil.writeProto(fs, metaPath, meta.getProto());
 
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, dataPath);
+    Appender appender = sm.getAppender(meta, schema, dataPath);
     appender.enableStats();
     appender.init();
 
@@ -110,7 +111,7 @@ public class TestRowFile {
 
     int tupleCnt = 0;
     start = System.currentTimeMillis();
-    Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
     while ((tuple=scanner.next()) != null) {
       tupleCnt++;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 186a7f5..5a93538 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -90,7 +90,7 @@ public class TestRangeRetrieverHandler {
     catalog = util.getMiniCatalogCluster().getCatalog();
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
@@ -120,7 +120,7 @@ public class TestRangeRetrieverHandler {
 
     Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv");
     fs.mkdirs(tableDir.getParent());
-    Appender appender = sm.getAppender(employeeMeta, schema, tableDir);
+    Appender appender = ((FileStorageManager)sm).getAppender(employeeMeta, schema, tableDir);
     appender.init();
 
     Tuple tuple = new VTuple(schema.size());
@@ -145,7 +145,7 @@ public class TestRangeRetrieverHandler {
         tableDir.toUri());
     catalog.createTable(employee);
 
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employeeMeta, tableDir, Integer.MAX_VALUE);
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tableDir, Integer.MAX_VALUE);
 
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
         LocalTajoTestingUtility.newQueryUnitAttemptId(),
@@ -155,7 +155,7 @@ public class TestRangeRetrieverHandler {
     LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     ExternalSortExec sort = null;
@@ -169,7 +169,7 @@ public class TestRangeRetrieverHandler {
     }
 
     SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
-    RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sm, sort, sort.getSchema(),
+    RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sort, sort.getSchema(),
         sort.getSchema(), sortSpecs);
 
     exec = idxStoreExec;
@@ -185,7 +185,7 @@ public class TestRangeRetrieverHandler {
     reader.open();
 
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
-    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema,
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, meta, schema,
         StorageUtil.concatPath(testDir, "output", "output"));
 
     scanner.init();
@@ -245,7 +245,7 @@ public class TestRangeRetrieverHandler {
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path tablePath = StorageUtil.concatPath(testDir, "testGetFromDescendingOrder", "table.csv");
     fs.mkdirs(tablePath.getParent());
-    Appender appender = sm.getAppender(meta, schema, tablePath);
+    Appender appender = ((FileStorageManager)sm).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
     for (int i = (TEST_TUPLE - 1); i >= 0 ; i--) {
@@ -268,7 +268,7 @@ public class TestRangeRetrieverHandler {
         CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta, tablePath.toUri());
     catalog.createTable(employee);
 
-    FileFragment[] frags = sm.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
 
     TaskAttemptContext
         ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -279,7 +279,7 @@ public class TestRangeRetrieverHandler {
     LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     ExternalSortExec sort = null;
@@ -293,7 +293,7 @@ public class TestRangeRetrieverHandler {
     }
 
     SortSpec[] sortSpecs = sort.getPlan().getSortKeys();
-    RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sm, sort,
+    RangeShuffleFileWriteExec idxStoreExec = new RangeShuffleFileWriteExec(ctx, sort,
         sort.getSchema(), sort.getSchema(), sortSpecs);
 
     exec = idxStoreExec;
@@ -308,7 +308,7 @@ public class TestRangeRetrieverHandler {
         new Path(testDir, "output/index"), keySchema, comp);
     reader.open();
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
-    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, outputMeta, schema,
+    SeekableScanner scanner = FileStorageManager.getSeekableScanner(conf, outputMeta, schema,
         StorageUtil.concatPath(testDir, "output", "output"));
     scanner.init();
     int cnt = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data b/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data
new file mode 100644
index 0000000..417d480
--- /dev/null
+++ b/tajo-core/src/test/resources/dataset/TestHBaseTable/splits.data
@@ -0,0 +1,4 @@
+010
+040
+060
+080
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result b/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result
new file mode 100644
index 0000000..8d50bf1
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestHBaseTable/testBinaryMappedQuery.result
@@ -0,0 +1,81 @@
+rk,col1,col2,col3
+-------------------------------
+21,a-21,{"k1":"k1-21", "k2":"k2-21"},21
+22,a-22,{"k1":"k1-22", "k2":"k2-22"},22
+23,a-23,{"k1":"k1-23", "k2":"k2-23"},23
+24,a-24,{"k1":"k1-24", "k2":"k2-24"},24
+25,a-25,{"k1":"k1-25", "k2":"k2-25"},25
+26,a-26,{"k1":"k1-26", "k2":"k2-26"},26
+27,a-27,{"k1":"k1-27", "k2":"k2-27"},27
+28,a-28,{"k1":"k1-28", "k2":"k2-28"},28
+29,a-29,{"k1":"k1-29", "k2":"k2-29"},29
+30,a-30,{"k1":"k1-30", "k2":"k2-30"},30
+31,a-31,{"k1":"k1-31", "k2":"k2-31"},31
+32,a-32,{"k1":"k1-32", "k2":"k2-32"},32
+33,a-33,{"k1":"k1-33", "k2":"k2-33"},33
+34,a-34,{"k1":"k1-34", "k2":"k2-34"},34
+35,a-35,{"k1":"k1-35", "k2":"k2-35"},35
+36,a-36,{"k1":"k1-36", "k2":"k2-36"},36
+37,a-37,{"k1":"k1-37", "k2":"k2-37"},37
+38,a-38,{"k1":"k1-38", "k2":"k2-38"},38
+39,a-39,{"k1":"k1-39", "k2":"k2-39"},39
+40,a-40,{"k1":"k1-40", "k2":"k2-40"},40
+41,a-41,{"k1":"k1-41", "k2":"k2-41"},41
+42,a-42,{"k1":"k1-42", "k2":"k2-42"},42
+43,a-43,{"k1":"k1-43", "k2":"k2-43"},43
+44,a-44,{"k1":"k1-44", "k2":"k2-44"},44
+45,a-45,{"k1":"k1-45", "k2":"k2-45"},45
+46,a-46,{"k1":"k1-46", "k2":"k2-46"},46
+47,a-47,{"k1":"k1-47", "k2":"k2-47"},47
+48,a-48,{"k1":"k1-48", "k2":"k2-48"},48
+49,a-49,{"k1":"k1-49", "k2":"k2-49"},49
+50,a-50,{"k1":"k1-50", "k2":"k2-50"},50
+51,a-51,{"k1":"k1-51", "k2":"k2-51"},51
+52,a-52,{"k1":"k1-52", "k2":"k2-52"},52
+53,a-53,{"k1":"k1-53", "k2":"k2-53"},53
+54,a-54,{"k1":"k1-54", "k2":"k2-54"},54
+55,a-55,{"k1":"k1-55", "k2":"k2-55"},55
+56,a-56,{"k1":"k1-56", "k2":"k2-56"},56
+57,a-57,{"k1":"k1-57", "k2":"k2-57"},57
+58,a-58,{"k1":"k1-58", "k2":"k2-58"},58
+59,a-59,{"k1":"k1-59", "k2":"k2-59"},59
+60,a-60,{"k1":"k1-60", "k2":"k2-60"},60
+61,a-61,{"k1":"k1-61", "k2":"k2-61"},61
+62,a-62,{"k1":"k1-62", "k2":"k2-62"},62
+63,a-63,{"k1":"k1-63", "k2":"k2-63"},63
+64,a-64,{"k1":"k1-64", "k2":"k2-64"},64
+65,a-65,{"k1":"k1-65", "k2":"k2-65"},65
+66,a-66,{"k1":"k1-66", "k2":"k2-66"},66
+67,a-67,{"k1":"k1-67", "k2":"k2-67"},67
+68,a-68,{"k1":"k1-68", "k2":"k2-68"},68
+69,a-69,{"k1":"k1-69", "k2":"k2-69"},69
+70,a-70,{"k1":"k1-70", "k2":"k2-70"},70
+71,a-71,{"k1":"k1-71", "k2":"k2-71"},71
+72,a-72,{"k1":"k1-72", "k2":"k2-72"},72
+73,a-73,{"k1":"k1-73", "k2":"k2-73"},73
+74,a-74,{"k1":"k1-74", "k2":"k2-74"},74
+75,a-75,{"k1":"k1-75", "k2":"k2-75"},75
+76,a-76,{"k1":"k1-76", "k2":"k2-76"},76
+77,a-77,{"k1":"k1-77", "k2":"k2-77"},77
+78,a-78,{"k1":"k1-78", "k2":"k2-78"},78
+79,a-79,{"k1":"k1-79", "k2":"k2-79"},79
+80,a-80,{"k1":"k1-80", "k2":"k2-80"},80
+81,a-81,{"k1":"k1-81", "k2":"k2-81"},81
+82,a-82,{"k1":"k1-82", "k2":"k2-82"},82
+83,a-83,{"k1":"k1-83", "k2":"k2-83"},83
+84,a-84,{"k1":"k1-84", "k2":"k2-84"},84
+85,a-85,{"k1":"k1-85", "k2":"k2-85"},85
+86,a-86,{"k1":"k1-86", "k2":"k2-86"},86
+87,a-87,{"k1":"k1-87", "k2":"k2-87"},87
+88,a-88,{"k1":"k1-88", "k2":"k2-88"},88
+89,a-89,{"k1":"k1-89", "k2":"k2-89"},89
+90,a-90,{"k1":"k1-90", "k2":"k2-90"},90
+91,a-91,{"k1":"k1-91", "k2":"k2-91"},91
+92,a-92,{"k1":"k1-92", "k2":"k2-92"},92
+93,a-93,{"k1":"k1-93", "k2":"k2-93"},93
+94,a-94,{"k1":"k1-94", "k2":"k2-94"},94
+95,a-95,{"k1":"k1-95", "k2":"k2-95"},95
+96,a-96,{"k1":"k1-96", "k2":"k2-96"},96
+97,a-97,{"k1":"k1-97", "k2":"k2-97"},97
+98,a-98,{"k1":"k1-98", "k2":"k2-98"},98
+99,a-99,{"k1":"k1-99", "k2":"k2-99"},99
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/resources/results/TestHBaseTable/testCATS.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testCATS.result b/tajo-core/src/test/resources/results/TestHBaseTable/testCATS.result
new file mode 100644
index 0000000..72013f2
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestHBaseTable/testCATS.result
@@ -0,0 +1,100 @@
+000, value0
+001, value1
+002, value2
+003, value3
+004, value4
+005, value5
+006, value6
+007, value7
+008, value8
+009, value9
+010, value10
+011, value11
+012, value12
+013, value13
+014, value14
+015, value15
+016, value16
+017, value17
+018, value18
+019, value19
+020, value20
+021, value21
+022, value22
+023, value23
+024, value24
+025, value25
+026, value26
+027, value27
+028, value28
+029, value29
+030, value30
+031, value31
+032, value32
+033, value33
+034, value34
+035, value35
+036, value36
+037, value37
+038, value38
+039, value39
+040, value40
+041, value41
+042, value42
+043, value43
+044, value44
+045, value45
+046, value46
+047, value47
+048, value48
+049, value49
+050, value50
+051, value51
+052, value52
+053, value53
+054, value54
+055, value55
+056, value56
+057, value57
+058, value58
+059, value59
+060, value60
+061, value61
+062, value62
+063, value63
+064, value64
+065, value65
+066, value66
+067, value67
+068, value68
+069, value69
+070, value70
+071, value71
+072, value72
+073, value73
+074, value74
+075, value75
+076, value76
+077, value77
+078, value78
+079, value79
+080, value80
+081, value81
+082, value82
+083, value83
+084, value84
+085, value85
+086, value86
+087, value87
+088, value88
+089, value89
+090, value90
+091, value91
+092, value92
+093, value93
+094, value94
+095, value95
+096, value96
+097, value97
+098, value98
+099, value99

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/resources/results/TestHBaseTable/testColumnKeyValueSelectQuery.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testColumnKeyValueSelectQuery.result b/tajo-core/src/test/resources/results/TestHBaseTable/testColumnKeyValueSelectQuery.result
new file mode 100644
index 0000000..82d4fd2
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestHBaseTable/testColumnKeyValueSelectQuery.result
@@ -0,0 +1,12 @@
+rk1,col2_key,col2_value,col3
+-------------------------------
+rk-0,["key-0", "key-1", "key-2", "key-3", "key-4"],["value-0", "value-1", "value-2", "value-3", "value-4"],col3-value-0
+rk-1,["key-0", "key-1", "key-2", "key-3", "key-4"],["value-0", "value-1", "value-2", "value-3", "value-4"],col3-value-1
+rk-2,["key-0", "key-1", "key-2", "key-3", "key-4"],["value-0", "value-1", "value-2", "value-3", "value-4"],col3-value-2
+rk-3,["key-0", "key-1", "key-2", "key-3", "key-4"],["value-0", "value-1", "value-2", "value-3", "value-4"],col3-value-3
+rk-4,["key-0", "key-1", "key-2", "key-3", "key-4"],["value-0", "value-1", "value-2", "value-3", "value-4"],col3-value-4
+rk-5,["key-0", "key-1", "key-2", "key-3", "key-4"],["value-0", "value-1", "value-2", "value-3", "value-4"],col3-value-5
+rk-6,["key-0", "key-1", "key-2", "key-3", "key-4"],["value-0", "value-1", "value-2", "value-3", "value-4"],col3-value-6
+rk-7,["key-0", "key-1", "key-2", "key-3", "key-4"],["value-0", "value-1", "value-2", "value-3", "value-4"],col3-value-7
+rk-8,["key-0", "key-1", "key-2", "key-3", "key-4"],["value-0", "value-1", "value-2", "value-3", "value-4"],col3-value-8
+rk-9,["key-0", "key-1", "key-2", "key-3", "key-4"],["value-0", "value-1", "value-2", "value-3", "value-4"],col3-value-9
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/resources/results/TestHBaseTable/testIndexPredication.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testIndexPredication.result b/tajo-core/src/test/resources/results/TestHBaseTable/testIndexPredication.result
new file mode 100644
index 0000000..f38d238
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestHBaseTable/testIndexPredication.result
@@ -0,0 +1,38 @@
+rk,col1,col2,col3
+-------------------------------
+020,a-20,{"k1":"k1-20", "k2":"k2-20"},b-20
+021,a-21,{"k1":"k1-21", "k2":"k2-21"},b-21
+022,a-22,{"k1":"k1-22", "k2":"k2-22"},b-22
+023,a-23,{"k1":"k1-23", "k2":"k2-23"},b-23
+024,a-24,{"k1":"k1-24", "k2":"k2-24"},b-24
+025,a-25,{"k1":"k1-25", "k2":"k2-25"},b-25
+026,a-26,{"k1":"k1-26", "k2":"k2-26"},b-26
+027,a-27,{"k1":"k1-27", "k2":"k2-27"},b-27
+028,a-28,{"k1":"k1-28", "k2":"k2-28"},b-28
+029,a-29,{"k1":"k1-29", "k2":"k2-29"},b-29
+030,a-30,{"k1":"k1-30", "k2":"k2-30"},b-30
+031,a-31,{"k1":"k1-31", "k2":"k2-31"},b-31
+032,a-32,{"k1":"k1-32", "k2":"k2-32"},b-32
+033,a-33,{"k1":"k1-33", "k2":"k2-33"},b-33
+034,a-34,{"k1":"k1-34", "k2":"k2-34"},b-34
+035,a-35,{"k1":"k1-35", "k2":"k2-35"},b-35
+036,a-36,{"k1":"k1-36", "k2":"k2-36"},b-36
+037,a-37,{"k1":"k1-37", "k2":"k2-37"},b-37
+038,a-38,{"k1":"k1-38", "k2":"k2-38"},b-38
+039,a-39,{"k1":"k1-39", "k2":"k2-39"},b-39
+040,a-40,{"k1":"k1-40", "k2":"k2-40"},b-40
+041,a-41,{"k1":"k1-41", "k2":"k2-41"},b-41
+042,a-42,{"k1":"k1-42", "k2":"k2-42"},b-42
+043,a-43,{"k1":"k1-43", "k2":"k2-43"},b-43
+044,a-44,{"k1":"k1-44", "k2":"k2-44"},b-44
+045,a-45,{"k1":"k1-45", "k2":"k2-45"},b-45
+046,a-46,{"k1":"k1-46", "k2":"k2-46"},b-46
+047,a-47,{"k1":"k1-47", "k2":"k2-47"},b-47
+048,a-48,{"k1":"k1-48", "k2":"k2-48"},b-48
+049,a-49,{"k1":"k1-49", "k2":"k2-49"},b-49
+050,a-50,{"k1":"k1-50", "k2":"k2-50"},b-50
+051,a-51,{"k1":"k1-51", "k2":"k2-51"},b-51
+052,a-52,{"k1":"k1-52", "k2":"k2-52"},b-52
+053,a-53,{"k1":"k1-53", "k2":"k2-53"},b-53
+054,a-54,{"k1":"k1-54", "k2":"k2-54"},b-54
+055,a-55,{"k1":"k1-55", "k2":"k2-55"},b-55
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/resources/results/TestHBaseTable/testInsertInto.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testInsertInto.result b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertInto.result
new file mode 100644
index 0000000..e0c97ef
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertInto.result
@@ -0,0 +1,3 @@
+1, 1996-03-13, {"": "N"}, 7706
+2, 1997-01-28, {"": "N"}, 1191
+3, 1994-02-02, {"": "R"}, 1798

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/resources/results/TestHBaseTable/testInsertIntoBinaryMultiRegion.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testInsertIntoBinaryMultiRegion.result b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertIntoBinaryMultiRegion.result
new file mode 100644
index 0000000..c55873a
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertIntoBinaryMultiRegion.result
@@ -0,0 +1,100 @@
+0, value0
+1, value1
+2, value2
+3, value3
+4, value4
+5, value5
+6, value6
+7, value7
+8, value8
+9, value9
+1, value10
+1, value11
+1, value12
+1, value13
+1, value14
+1, value15
+1, value16
+1, value17
+1, value18
+1, value19
+2, value20
+2, value21
+2, value22
+2, value23
+2, value24
+2, value25
+2, value26
+2, value27
+2, value28
+2, value29
+3, value30
+3, value31
+3, value32
+3, value33
+3, value34
+3, value35
+3, value36
+3, value37
+3, value38
+3, value39
+4, value40
+4, value41
+4, value42
+4, value43
+4, value44
+4, value45
+4, value46
+4, value47
+4, value48
+4, value49
+5, value50
+5, value51
+5, value52
+5, value53
+5, value54
+5, value55
+5, value56
+5, value57
+5, value58
+5, value59
+6, value60
+6, value61
+6, value62
+6, value63
+6, value64
+6, value65
+6, value66
+6, value67
+6, value68
+6, value69
+7, value70
+7, value71
+7, value72
+7, value73
+7, value74
+7, value75
+7, value76
+7, value77
+7, value78
+7, value79
+8, value80
+8, value81
+8, value82
+8, value83
+8, value84
+8, value85
+8, value86
+8, value87
+8, value88
+8, value89
+9, value90
+9, value91
+9, value92
+9, value93
+9, value94
+9, value95
+9, value96
+9, value97
+9, value98
+9, value99