You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/08/14 16:30:14 UTC

[35/51] [partial] tajo git commit: TAJO-1761: Separate an integration unit test kit into an independent module.

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
new file mode 100644
index 0000000..4ff5e3e
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -0,0 +1,1408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.error.Errors.ResultCode;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.hbase.*;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.TUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.sql.ResultSet;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+@Category(IntegrationTest.class)
+public class TestHBaseTable extends QueryTestCaseBase {
+  private static final Log LOG = LogFactory.getLog(TestHBaseTable.class);
+
+  private static String tableSpaceUri;
+  private static String hostName,zkPort;
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    try {
+      testingCluster.getHBaseUtil().startHBaseCluster();
+      hostName = InetAddress.getLocalHost().getHostName();
+      zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
+      assertNotNull(hostName);
+      assertNotNull(zkPort);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    tableSpaceUri = "hbase:zk://" + hostName + ":" + zkPort;
+    HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri));
+    hBaseTablespace.init(new TajoConf(testingCluster.getHBaseUtil().getConf()));
+    TablespaceManager.addTableSpaceForTest(hBaseTablespace);
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    try {
+      testingCluster.getHBaseUtil().stopHBaseCluster();
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testVerifyCreateHBaseTableRequiredMeta() throws Exception {
+    try {
+      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) TABLESPACE cluster1 USING hbase").close();
+      fail("hbase table must have 'table' meta");
+    } catch (TajoException e) {
+      assertEquals(e.getErrorCode(), ResultCode.MISSING_TABLE_PROPERTY);
+    }
+
+    try {
+      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) TABLESPACE cluster1 " +
+          "USING hbase " +
+          "WITH ('table'='hbase_table')").close();
+
+      fail("hbase table must have 'columns' meta");
+    } catch (TajoException e) {
+      assertEquals(e.getErrorCode(), ResultCode.MISSING_TABLE_PROPERTY);
+    }
+  }
+
+  @Test
+  public void testCreateHBaseTable() throws Exception {
+    executeString(
+        "CREATE TABLE hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:a,col3:,col2:b')").close();
+
+    assertTableExists("hbase_mapped_table1");
+
+    HTableDescriptor hTableDesc = testingCluster.getHBaseUtil().getTableDescriptor("hbase_table");
+    assertNotNull(hTableDesc);
+    assertEquals("hbase_table", hTableDesc.getNameAsString());
+
+    HColumnDescriptor[] hColumns = hTableDesc.getColumnFamilies();
+    // col1 is mapped to rowkey
+    assertEquals(2, hColumns.length);
+    assertEquals("col2", hColumns[0].getNameAsString());
+    assertEquals("col3", hColumns[1].getNameAsString());
+
+    executeString("DROP TABLE hbase_mapped_table1 PURGE").close();
+
+    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    try {
+      assertFalse(hAdmin.tableExists("hbase_table"));
+    } finally {
+      hAdmin.close();
+    }
+  }
+
+  @Test
+  public void testCreateNotExistsExternalHBaseTable() throws Exception {
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:a,col3:,col2:b') " +
+            "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    try {
+      executeString(sql).close();
+      fail("External table should be a existed table.");
+    } catch (Throwable e) {
+      assertTrue(e.getMessage().indexOf("External table should be a existed table.") >= 0);
+    }
+  }
+
+  @Test
+  public void testCreateRowFieldWithNonText() throws Exception {
+    try {
+      executeString("CREATE TABLE hbase_mapped_table2 (rk1 int4, rk2 text, col3 text, col4 text) " +
+          "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'='0:key#b,1:key,col3:,col2:b', " +
+          "'hbase.rowkey.delimiter'='_')").close();
+      fail("Key field type should be TEXT type");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().indexOf("Key field type should be TEXT type") >= 0);
+    }
+  }
+
+  @Test
+  public void testCreateExternalHBaseTable() throws Exception {
+    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table_not_purge"));
+    hTableDesc.addFamily(new HColumnDescriptor("col1"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table_not_purge', 'columns'=':key,col1:a,col2:,col3:b') " +
+        "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    executeString(sql).close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    executeString("DROP TABLE external_hbase_mapped_table").close();
+
+    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    try {
+      assertTrue(hAdmin.tableExists("external_hbase_table_not_purge"));
+      hAdmin.disableTable("external_hbase_table_not_purge");
+      hAdmin.deleteTable("external_hbase_table_not_purge");
+    } finally {
+      hAdmin.close();
+    }
+  }
+
+  @Test
+  public void testSimpleSelectQuery() throws Exception {
+    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col1"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b') " +
+        "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    executeString(sql).close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
+    HConnection hconn = space.getConnection();
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(i).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testBinaryMappedQuery() throws Exception {
+    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col1"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk int8, col1 text, col2 text, col3 int4)\n " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key#b,col1:a,col2:,col3:b#b') " +
+        "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    executeString(sql).close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
+    HConnection hconn = space.getConnection();
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(Bytes.toBytes((long) i));
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes(i));
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table where rk > 20");
+      assertResultSet(res);
+      res.close();
+
+      //Projection
+      res = executeString("select col3, col2, rk from external_hbase_mapped_table where rk > 95");
+
+      String expected = "col3,col2,rk\n" +
+          "-------------------------------\n" +
+          "96,{\"k1\":\"k1-96\", \"k2\":\"k2-96\"},96\n" +
+          "97,{\"k1\":\"k1-97\", \"k2\":\"k2-97\"},97\n" +
+          "98,{\"k1\":\"k1-98\", \"k2\":\"k2-98\"},98\n" +
+          "99,{\"k1\":\"k1-99\", \"k2\":\"k2-99\"},99\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testColumnKeyValueSelectQuery() throws Exception {
+    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col2"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, col2_key text, col2_value text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " +
+        "'hbase.rowkey.delimiter'='_') LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    executeString(sql).close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
+    HConnection hconn = space.getConnection();
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 10; i++) {
+        Put put = new Put(Bytes.toBytes("rk-" + i));
+        for (int j = 0; j < 5; j++) {
+          put.add("col2".getBytes(), ("key-" + j).getBytes(), Bytes.toBytes("value-" + j));
+        }
+        put.add("col3".getBytes(), "".getBytes(), ("col3-value-" + i).getBytes());
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 >= 'rk-0'");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testRowFieldSelectQuery() throws Exception {
+    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
+    hTableDesc.addFamily(new HColumnDescriptor("col3"));
+    testingCluster.getHBaseUtil().createTable(hTableDesc);
+
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, rk2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'='0:key,1:key,col3:a', " +
+        "'hbase.rowkey.delimiter'='_') LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    executeString(sql).close();
+
+    assertTableExists("external_hbase_mapped_table");
+
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
+    HConnection hconn = space.getConnection();
+    HTableInterface htable = hconn.getTable("external_hbase_table");
+
+    try {
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(("field1-" + i + "_field2-" + i).getBytes());
+        put.add("col3".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 > 'field1-20'");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
+      htable.close();
+    }
+  }
+
+  @Test
+  public void testIndexPredication() throws Exception {
+    String sql =
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " +
+        "'hbase.split.rowkeys'='010,040,060,080') ";
+    executeString(sql).close();
+
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    hAdmin.tableExists("hbase_table");
+
+    HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+    try {
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        htable.put(put);
+      }
+      assertIndexPredication(false);
+
+      ResultSet res = executeString("select * from hbase_mapped_table where rk >= '020' and rk <= '055'");
+      assertResultSet(res);
+      res.close();
+
+      res = executeString("select * from hbase_mapped_table where rk = '021'");
+      String expected = "rk,col1,col2,col3\n" +
+          "-------------------------------\n" +
+          "021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      htable.close();
+      hAdmin.close();
+    }
+  }
+
+  @Test
+  public void testCompositeRowIndexPredication() throws Exception {
+
+    executeString("CREATE TABLE hbase_mapped_table (rk text, rk2 text, col1 text, col2 text, col3 text) " +
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
+        "'hbase.split.rowkeys'='010,040,060,080', " +
+        "'hbase.rowkey.delimiter'='_')").close();
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    hAdmin.tableExists("hbase_table");
+
+    HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+    try {
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put((df.format(i) + "_" + df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        htable.put(put);
+      }
+
+      Scan scan = new Scan();
+      scan.setStartRow("021".getBytes());
+      scan.setStopRow(("021_" + new String(new char[]{Character.MAX_VALUE})).getBytes());
+      Filter filter = new InclusiveStopFilter(scan.getStopRow());
+      scan.setFilter(filter);
+
+      ResultScanner scanner = htable.getScanner(scan);
+      Result result = scanner.next();
+      assertNotNull(result);
+      assertEquals("021_021", new String(result.getRow()));
+      scanner.close();
+
+      assertIndexPredication(true);
+
+      ResultSet res = executeString("select * from hbase_mapped_table where rk = '021'");
+      String expected = "rk,rk2,col1,col2,col3\n" +
+          "-------------------------------\n" +
+          "021,021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      htable.close();
+      hAdmin.close();
+    }
+  }
+
+  private void assertIndexPredication(boolean isCompositeRowKey) throws Exception {
+    String postFix = isCompositeRowKey ? "_" + new String(new char[]{Character.MAX_VALUE}) : "";
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    ScanNode scanNode = new ScanNode(1);
+
+    // where rk = '021'
+    EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("021")));
+    scanNode.setQual(evalNodeEq);
+    Tablespace tablespace = TablespaceManager.getByName("cluster1").get();
+    List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    assertEquals(1, fragments.size());
+    assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
+    assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
+
+    // where rk >= '020' and rk <= '055'
+    EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("020")));
+    EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("055")));
+    EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
+    scanNode.setQual(evalNodeA);
+
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    assertEquals(2, fragments.size());
+    HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    HBaseFragment fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+
+    // where (rk >= '020' and rk <= '055') or rk = '075'
+    EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("075")));
+    EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
+    scanNode.setQual(evalNodeB);
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    assertEquals(3, fragments.size());
+    fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+
+    HBaseFragment fragment3 = (HBaseFragment) fragments.get(2);
+    assertEquals("075", new String(fragment3.getStartRow()));
+    assertEquals("075" + postFix, new String(fragment3.getStopRow()));
+
+
+    // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
+    EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("072")));
+    EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("078")));
+    EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+    EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
+    scanNode.setQual(evalNodeD);
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    assertEquals(3, fragments.size());
+
+    fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
+
+    fragment3 = (HBaseFragment) fragments.get(2);
+    assertEquals("072", new String(fragment3.getStartRow()));
+    assertEquals("078" + postFix, new String(fragment3.getStopRow()));
+
+    // where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059')
+    evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("057")));
+    evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
+        new ConstEval(new TextDatum("059")));
+    evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+    evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
+    scanNode.setQual(evalNodeD);
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    assertEquals(2, fragments.size());
+
+    fragment1 = (HBaseFragment) fragments.get(0);
+    assertEquals("020", new String(fragment1.getStartRow()));
+    assertEquals("040", new String(fragment1.getStopRow()));
+
+    fragment2 = (HBaseFragment) fragments.get(1);
+    assertEquals("040", new String(fragment2.getStartRow()));
+    assertEquals("059" + postFix, new String(fragment2.getStopRow()));
+  }
+
+  @Test
+  public void testNonForwardQuery() throws Exception {
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int) " +
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:#b', " +
+        "'hbase.split.rowkeys'='010,040,060,080')").close();
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    HTable htable = null;
+    try {
+      hAdmin.tableExists("hbase_table");
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "".getBytes(), Bytes.toBytes(i));
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select * from hbase_mapped_table");
+      assertResultSet(res);
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      hAdmin.close();
+      if (htable == null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testJoin() throws Exception {
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int8) " +
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
+        "'hbase.split.rowkeys'='010,040,060,080')").close();
+
+    assertTableExists("hbase_mapped_table");
+    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+    HTable htable = null;
+    try {
+      hAdmin.tableExists("hbase_table");
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
+      assertEquals(5, keys.getFirst().length);
+
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 0; i < 100; i++) {
+        Put put = new Put(String.valueOf(df.format(i)).getBytes());
+        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
+        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
+        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
+        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
+        put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes((long) i));
+        htable.put(put);
+      }
+
+      ResultSet res = executeString("select a.rk, a.col1, a.col2, a.col3, b.l_orderkey, b.l_linestatus " +
+          "from hbase_mapped_table a " +
+          "join default.lineitem b on a.col3 = b.l_orderkey");
+      assertResultSet(res);
+      res.close();
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+      hAdmin.close();
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertInto() throws Exception {
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " +
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    executeString("insert into hbase_mapped_table " +
+        "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
+          new boolean[]{false, false, false, true}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertValues1() throws Exception {
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " +
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    executeString("insert into hbase_mapped_table select 'aaa', 'a12', 'a34', 1").close();
+    executeString("insert into hbase_mapped_table select 'bbb', 'b12', 'b34', 2").close();
+    executeString("insert into hbase_mapped_table select 'ccc', 'c12', 'c34', 3").close();
+    executeString("insert into hbase_mapped_table select 'ddd', 'd12', 'd34', 4").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
+          new boolean[]{false, false, false, true}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegion() throws Exception {
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='010,040,060,080')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    DecimalFormat df = new DecimalFormat("000");
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegion2() throws Exception {
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(i + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegionWithSplitFile() throws Exception {
+    String splitFilePath = currentDatasetPath + "/splits.data";
+
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys.file'='" + splitFilePath + "')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    DecimalFormat df = new DecimalFormat("000");
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoMultiRegionMultiRowFields() throws Exception {
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a', " +
+        "'hbase.split.rowkeys'='001,002,003,004,005,006,007,008,009', " +
+        "'hbase.rowkey.delimiter'='_')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id1", Type.TEXT);
+    schema.addColumn("id2", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    DecimalFormat df = new DecimalFormat("000");
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|" + (i + 100) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id1, id2, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, null, Bytes.toBytes("col1")},
+          new byte[][]{null, null, Bytes.toBytes("a")},
+          new boolean[]{false, false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoBinaryMultiRegion() throws Exception {
+    executeString("CREATE TABLE hbase_mapped_table (rk int4, col1 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key#b,col1:a', " +
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(i + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select id, name from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{true, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoColumnKeyValue() throws Exception {
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col2_key text, col2_value text, col3 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " +
+        "'hbase.rowkey.delimiter'='_')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("rk", Type.TEXT);
+    schema.addColumn("col2_key", Type.TEXT);
+    schema.addColumn("col2_value", Type.TEXT);
+    schema.addColumn("col3", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 20; i >= 0; i--) {
+      for (int j = 0; j < 3; j++) {
+        datas.add(i + "|ck-" + j + "|value-" + j + "|col3-" + i);
+      }
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString("insert into hbase_mapped_table " +
+        "select rk, col2_key, col2_value, col3 from base_table ").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, null, null},
+          new boolean[]{false, false, false}, tableDesc.getSchema()));
+
+      ResultSet res = executeString("select * from hbase_mapped_table");
+
+      String expected = "rk,col2_key,col2_value,col3\n" +
+          "-------------------------------\n" +
+          "0,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-0\n" +
+          "1,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-1\n" +
+          "10,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-10\n" +
+          "11,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-11\n" +
+          "12,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-12\n" +
+          "13,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-13\n" +
+          "14,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-14\n" +
+          "15,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-15\n" +
+          "16,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-16\n" +
+          "17,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-17\n" +
+          "18,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-18\n" +
+          "19,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-19\n" +
+          "2,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-2\n" +
+          "20,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-20\n" +
+          "3,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-3\n" +
+          "4,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-4\n" +
+          "5,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-5\n" +
+          "6,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-6\n" +
+          "7,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-7\n" +
+          "8,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-8\n" +
+          "9,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-9\n";
+
+      assertEquals(expected, resultSetToString(res));
+      res.close();
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoDifferentType() throws Exception {
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9')").close();
+
+    assertTableExists("hbase_mapped_table");
+
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<String>();
+    for (int i = 99; i >= 0; i--) {
+      datas.add(i + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    try {
+      executeString("insert into hbase_mapped_table " +
+          "select id, name from base_table ").close();
+      fail("If inserting data type different with target table data type, should throw exception");
+    } catch (TajoException e) {
+      assertEquals(ResultCode.DATATYPE_MISMATCH, e.getErrorCode());
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+    }
+  }
+
+  @Test
+  public void testInsertIntoRowField() throws Exception {
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text, col2 text, col3 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
+        "'hbase.rowkey.delimiter'='_')").close();
+
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    executeString("insert into hbase_mapped_table " +
+        "select l_orderkey::text, l_partkey::text, l_shipdate, l_returnflag, l_suppkey::text from default.lineitem ");
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), Bytes.toBytes(""), Bytes.toBytes("b")},
+          new boolean[]{false, false, false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testCTAS() throws Exception {
+    // create test table
+    KeyValueSet tableOptions = new KeyValueSet();
+    tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+    tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.TEXT);
+    schema.addColumn("name", Type.TEXT);
+    List<String> datas = new ArrayList<>();
+    DecimalFormat df = new DecimalFormat("000");
+    for (int i = 99; i >= 0; i--) {
+      datas.add(df.format(i) + "|value" + i);
+    }
+    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+        schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
+        "'hbase.split.rowkeys'='010,040,060,080') as" +
+        " select id, name from base_table"
+    ).close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1")},
+          new byte[][]{null, Bytes.toBytes("a")},
+          new boolean[]{false, false}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+
+      // TODO - rollback should support its corresponding hbase table
+      HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+      if (hAdmin.tableExists("hbase_table")) {
+        hAdmin.disableTable("hbase_table");
+        hAdmin.deleteTable("hbase_table");
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoUsingPut() throws Exception {
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    Map<String, String> sessions = new HashMap<>();
+    sessions.put(HBaseStorageConstants.INSERT_PUT_MODE, "true");
+    client.updateSessionVariables(sessions);
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      executeString(
+          "insert into hbase_mapped_table " +
+          "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem"
+      ).close();
+
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      // result is dirrerent with testInsertInto because l_orderkey is not unique.
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
+          new boolean[]{false, false, false, true}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      client.unsetSessionVariables(TUtil.newList(HBaseStorageConstants.INSERT_PUT_MODE));
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
+  public void testInsertIntoLocation() throws Exception {
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:', " +
+        "'hbase.split.rowkeys'='010,040,060,080')").close();
+
+    assertTableExists("hbase_mapped_table");
+
+    try {
+      // create test table
+      KeyValueSet tableOptions = new KeyValueSet();
+      tableOptions.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
+      tableOptions.set(StorageConstants.TEXT_NULL, "\\\\N");
+
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.TEXT);
+      schema.addColumn("name", Type.TEXT);
+      schema.addColumn("comment", Type.TEXT);
+      List<String> datas = new ArrayList<>();
+      DecimalFormat df = new DecimalFormat("000");
+      for (int i = 99; i >= 0; i--) {
+        datas.add(df.format(i) + "|value" + i + "|comment-" + i);
+      }
+      TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
+          schema, tableOptions, datas.toArray(new String[]{}), 2);
+
+      executeString("insert into location '/tmp/hfile_test' " +
+          "select id, name, comment from base_table ").close();
+
+      FileSystem fs = testingCluster.getDefaultFileSystem();
+      Path path = new Path("/tmp/hfile_test");
+      assertTrue(fs.exists(path));
+
+      FileStatus[] files = fs.listStatus(path);
+      assertNotNull(files);
+      assertEquals(2, files.length);
+
+      int index = 0;
+      for (FileStatus eachFile: files) {
+        assertEquals("/tmp/hfile_test/part-01-00000" + index + "-00" + index, eachFile.getPath().toUri().getPath());
+        for (FileStatus subFile: fs.listStatus(eachFile.getPath())) {
+          assertTrue(subFile.isFile());
+          assertTrue(subFile.getLen() > 0);
+        }
+        index++;
+      }
+    } finally {
+      executeString("DROP TABLE base_table PURGE").close();
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+    }
+  }
+
+  private String resultSetToString(ResultScanner scanner, byte[][] cfNames, byte[][] qualifiers,
+                                   boolean [] binaries, Schema schema) throws Exception {
+    StringBuilder sb = new StringBuilder();
+    Result result = null;
+    while ( (result = scanner.next()) != null ) {
+      if (binaries[0]) {
+        sb.append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(0), result.getRow()).asChar());
+      } else {
+        sb.append(new String(result.getRow()));
+      }
+
+      for (int i = 0; i < cfNames.length; i++) {
+        if (cfNames[i] == null) {
+          //rowkey
+          continue;
+        }
+        if (qualifiers[i] == null) {
+          Map<byte[], byte[]> values = result.getFamilyMap(cfNames[i]);
+          if (values == null) {
+            sb.append(", null");
+          } else {
+            sb.append(", {");
+            String delim = "";
+            for (Map.Entry<byte[], byte[]> valueEntry: values.entrySet()) {
+              byte[] keyBytes = valueEntry.getKey();
+              byte[] valueBytes = valueEntry.getValue();
+
+              if (binaries[i]) {
+                sb.append(delim).append("\"").append(keyBytes == null ? "" : Bytes.toLong(keyBytes)).append("\"");
+                sb.append(": \"").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\"");
+              } else {
+                sb.append(delim).append("\"").append(keyBytes == null ? "" : new String(keyBytes)).append("\"");
+                sb.append(": \"").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\"");
+              }
+              delim = ", ";
+            }
+            sb.append("}");
+          }
+        } else {
+          byte[] value = result.getValue(cfNames[i], qualifiers[i]);
+          if (value == null) {
+            sb.append(", null");
+          } else {
+            if (binaries[i]) {
+              sb.append(", ").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), value));
+            } else {
+              sb.append(", ").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), value));
+            }
+          }
+        }
+      }
+      sb.append("\n");
+    }
+
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInSubquery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInSubquery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInSubquery.java
new file mode 100644
index 0000000..fe465f1
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInSubquery.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.apache.tajo.error.Errors.ResultCode;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.SQLException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+public class TestInSubquery extends TestJoinQuery {
+
+  public TestInSubquery(String joinOption) throws Exception {
+    super(joinOption);
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestJoinQuery.setup();
+  }
+
+  @AfterClass
+  public static void classTearDown() throws SQLException {
+    TestJoinQuery.classTearDown();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testInSubQuery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testInSubQuery2() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testNestedInSubQuery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testInSubQueryWithOtherConditions() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testMultipleInSubQuery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testInSubQueryWithJoin() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testInSubQueryWithTableSubQuery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testNotInSubQuery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testMultipleNotInSubQuery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testNestedNotInSubQuery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testInAndNotInSubQuery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testNestedInAndNotInSubQuery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testNestedInSubQuery2() throws Exception {
+    // select c_name from customer
+    // where c_nationkey in (
+    //    select n_nationkey from nation where n_name like 'C%' and n_regionkey in (
+    //    select count(*)-1 from region where r_regionkey > 0 and r_regionkey < 3))
+    runSimpleTests();
+  }
+
+  @Test()
+  public final void testCorrelatedSubQuery() throws Exception {
+    // Use try-catch clause to verify the exact error message
+    try {
+      executeString("select * from nation where n_regionkey in (select r_regionkey from region where n_name > r_name)");
+      fail("Correlated subquery must raise the UnimplementedException.");
+    } catch (TajoRuntimeException e) {
+      assertEquals(ResultCode.NOT_IMPLEMENTED, e.getErrorCode());
+    }
+  }
+
+  @Test
+  @Option(withExplain = false, withExplainGlobal = false, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testSameKeyNameOfOuterAndInnerQueries() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testWithAsteriskAndJoin() throws Exception {
+    // select * from lineitem, orders where l_orderkey = o_orderkey and l_partkey in
+    // (select l_partkey from lineitem where l_linenumber in (1, 3, 5, 7, 9))
+    runSimpleTests();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestIndexScan.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestIndexScan.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestIndexScan.java
new file mode 100644
index 0000000..a255c6d
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestIndexScan.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.QueryTestCaseBase;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.exception.NoSuchSessionVariableException;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+@Category(IntegrationTest.class)
+public class TestIndexScan extends QueryTestCaseBase {
+
+  public TestIndexScan() throws ServiceException, SQLException, NoSuchSessionVariableException {
+    super(TajoConstants.DEFAULT_DATABASE_NAME);
+    Map<String,String> sessionVars = new HashMap<String, String>();
+    sessionVars.put(SessionVars.INDEX_ENABLED.keyname(), "true");
+    sessionVars.put(SessionVars.INDEX_SELECTIVITY_THRESHOLD.keyname(), "0.01f");
+    client.updateSessionVariables(sessionVars);
+  }
+
+  @Test
+  public final void testOnSortedNonUniqueKeys() throws Exception {
+    executeString("create index l_orderkey_idx on lineitem (l_orderkey)");
+    try {
+      ResultSet res = executeString("select * from lineitem where l_orderkey = 1;");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop index l_orderkey_idx");
+    }
+  }
+
+  @Test
+  public final void testOnUnsortedTextKeys() throws Exception {
+    executeString("create index l_shipdate_idx on lineitem (l_shipdate)");
+    try {
+      ResultSet res = executeString("select l_orderkey, l_shipdate, l_comment from lineitem where l_shipdate = '1997-01-28';");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop index l_shipdate_idx");
+    }
+  }
+
+  @Test
+  public final void testOnMultipleKeys() throws Exception {
+    executeString("create index multikey_idx on lineitem (l_shipdate asc null last, l_tax desc null first, l_shipmode, l_linenumber desc null last)");
+    try {
+      ResultSet res = executeString("select l_orderkey, l_shipdate, l_comment from lineitem " +
+          "where l_shipdate = '1997-01-28' and l_tax = 0.05 and l_shipmode = 'RAIL' and l_linenumber = 1;");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop index multikey_idx");
+    }
+  }
+
+  @Test
+  public final void testOnMultipleKeys2() throws Exception {
+    executeString("create index multikey_idx on lineitem (l_shipdate asc null last, l_tax desc null first)");
+    try {
+      ResultSet res = executeString("select l_orderkey, l_shipdate, l_comment from lineitem " +
+          "where l_shipdate = '1997-01-28' and l_tax = 0.05 and l_shipmode = 'RAIL' and l_linenumber = 1;");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop index multikey_idx");
+    }
+  }
+
+  @Test
+  public final void testOnMultipleExprs() throws Exception {
+    executeString("create index l_orderkey_100_l_linenumber_10_idx on lineitem (l_orderkey*100-l_linenumber*10 asc null first);");
+    try {
+      ResultSet res = executeString("select l_orderkey, l_linenumber from lineitem where l_orderkey*100-l_linenumber*10 = 280");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop index l_orderkey_100_l_linenumber_10_idx");
+    }
+  }
+
+  @Test
+  public final void testWithGroupBy() throws Exception {
+    executeString("create index l_shipdate_idx on lineitem (l_shipdate)");
+    try {
+      ResultSet res = executeString("select l_shipdate, count(*) from lineitem where l_shipdate = '1997-01-28' group by l_shipdate;");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop index l_shipdate_idx");
+    }
+  }
+
+  @Test
+  public final void testWithSort() throws Exception {
+    executeString("create index l_orderkey_idx on lineitem (l_orderkey)");
+    try {
+      ResultSet res = executeString("select l_shipdate from lineitem where l_orderkey = 1 order by l_shipdate;");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop index l_orderkey_idx");
+    }
+  }
+
+  @Test
+  public final void testWithJoin() throws Exception {
+    executeString("create index l_orderkey_idx on lineitem (l_orderkey)");
+    executeString("create index o_orderkey_idx on orders (o_orderkey)");
+    try {
+      ResultSet res = executeString("select l_shipdate, o_orderstatus from lineitem, orders where l_orderkey = o_orderkey and l_orderkey = 1 and o_orderkey = 1;");
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("drop index l_orderkey_idx");
+      executeString("drop index o_orderkey_idx");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinQuery.java
new file mode 100644
index 0000000..88b3548
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinQuery.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+public class TestInnerJoinQuery extends TestJoinQuery {
+
+  public TestInnerJoinQuery(String joinOption) throws Exception {
+    super(joinOption);
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestJoinQuery.setup();
+  }
+
+  @AfterClass
+  public static void classTearDown() throws SQLException {
+    TestJoinQuery.classTearDown();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true, sort = true)
+  @SimpleTest(queries = {
+      @QuerySpec("select n_name, r_name, n_regionkey, r_regionkey from nation, region order by n_name, r_name"),
+      // testCrossJoinWithAsterisk
+      @QuerySpec("select region.*, customer.* from region, customer"),
+      @QuerySpec("select region.*, customer.* from customer, region"),
+      @QuerySpec("select * from customer, region"),
+      @QuerySpec("select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name")
+  })
+  public final void testCrossJoin() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true, sort = true)
+  @SimpleTest()
+  public final void testCrossJoinWithThetaJoinConditionInWhere() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testInnerJoinWithThetaJoinConditionInWhere() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testWhereClauseJoin1() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testWhereClauseJoin2() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testWhereClauseJoin3() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testWhereClauseJoin4() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testWhereClauseJoin5() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testWhereClauseJoin6() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testTPCHQ2Join() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testJoinWithMultipleJoinQual1() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testJoinCoReferredEvals1() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testJoinCoReferredEvalsWithSameExprs1() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testJoinCoReferredEvalsWithSameExprs2() throws Exception {
+    // including grouping operator
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testInnerJoinAndCaseWhen() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testInnerJoinWithEmptyTable() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testCrossJoinWithEmptyTable1() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest(prepare = {
+      "CREATE DATABASE JOINS",
+      "CREATE TABLE JOINS.part_ as SELECT * FROM part",
+      "CREATE TABLE JOINS.supplier_ as SELECT * FROM supplier"
+  }, cleanup = {
+      "DROP TABLE JOINS.part_ PURGE",
+      "DROP TABLE JOINS.supplier_ PURGE",
+      "DROP DATABASE JOINS"
+  })
+  public final void testJoinOnMultipleDatabases() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  public final void testJoinWithJson() throws Exception {
+    // select length(r_comment) as len, *, c_custkey*10 from customer, region order by len,r_regionkey,r_name
+    ResultSet res = executeJsonQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public final void testJoinOnMultipleDatabasesWithJson() throws Exception {
+    executeString("CREATE DATABASE JOINS");
+    assertDatabaseExists("joins");
+    executeString("CREATE TABLE JOINS.part_ as SELECT * FROM part");
+    assertTableExists("joins.part_");
+    executeString("CREATE TABLE JOINS.supplier_ as SELECT * FROM supplier");
+    assertTableExists("joins.supplier_");
+
+    try {
+      ResultSet res = executeJsonQuery();
+      assertResultSet(res);
+      cleanupQuery(res);
+    } finally {
+      executeString("DROP TABLE JOINS.part_ PURGE");
+      executeString("DROP TABLE JOINS.supplier_ PURGE");
+      executeString("DROP DATABASE JOINS");
+    }
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testJoinAsterisk() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testDifferentTypesJoinCondition() throws Exception {
+    // select * from table20 t3 join table21 t4 on t3.id = t4.id;
+    executeDDL("table1_int8_ddl.sql", "table1", "table20");
+    executeDDL("table1_int4_ddl.sql", "table1", "table21");
+    try {
+      runSimpleTests();
+    } finally {
+      executeString("DROP TABLE table20");
+      executeString("DROP TABLE table21");
+    }
+  }
+
+  @Test
+  @SimpleTest
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  public void testComplexJoinCondition1() throws Exception {
+    // select n1.n_nationkey, n1.n_name, n2.n_name  from nation n1 join nation n2 on n1.n_name = upper(n2.n_name);
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinCondition2() throws Exception {
+    // select n1.n_nationkey, n1.n_name, upper(n2.n_name) name from nation n1 join nation n2
+    // on n1.n_name = upper(n2.n_name);
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinCondition3() throws Exception {
+    // select n1.n_nationkey, n1.n_name, n2.n_name from nation n1 join nation n2 on lower(n1.n_name) = lower(n2.n_name);
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinCondition4() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testJoinWithOrPredicates() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testNaturalJoin() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testCrossJoinAndCaseWhen() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testCrossJoinWithAsterisk1() throws Exception {
+    // select region.*, customer.* from region, customer;
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testCrossJoinWithAsterisk2() throws Exception {
+    // select region.*, customer.* from customer, region;
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testCrossJoinWithAsterisk3() throws Exception {
+    // select * from customer, region
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testCrossJoinWithAsterisk4() throws Exception {
+    // select length(r_regionkey), *, c_custkey*10 from customer, region
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testBroadcastTwoPartJoin() throws Exception {
+    runSimpleTests();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/a4106883/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java
new file mode 100644
index 0000000..6c9546e
--- /dev/null
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestInnerJoinWithSubQuery.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.IntegrationTest;
+import org.apache.tajo.NamedTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+@Category(IntegrationTest.class)
+@RunWith(Parameterized.class)
+@NamedTest("TestJoinQuery")
+public class TestInnerJoinWithSubQuery extends TestJoinQuery {
+
+  public TestInnerJoinWithSubQuery(String joinOption) throws Exception {
+    super(joinOption);
+  }
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestJoinQuery.setup();
+  }
+
+  @AfterClass
+  public static void classTearDown() throws SQLException {
+    TestJoinQuery.classTearDown();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testJoinWithMultipleJoinQual2() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testJoinWithMultipleJoinQual3() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testJoinWithMultipleJoinQual4() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  public final void testJoinWithJson2() throws Exception {
+    /*
+    select t.n_nationkey, t.n_name, t.n_regionkey, t.n_comment, ps.ps_availqty, s.s_suppkey
+    from (
+      select n_nationkey, n_name, n_regionkey, n_comment
+      from nation n
+      join region r on (n.n_regionkey = r.r_regionkey)
+    ) t
+    join supplier s on (s.s_nationkey = t.n_nationkey)
+    join partsupp ps on (s.s_suppkey = ps.ps_suppkey)
+    where t.n_name in ('ARGENTINA','ETHIOPIA', 'MOROCCO');
+     */
+    ResultSet res = executeJsonQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinCondition5() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinCondition6() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public void testComplexJoinCondition7() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testBroadcastSubquery() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testBroadcastSubquery2() throws Exception {
+    runSimpleTests();
+  }
+
+  @Test
+  @Option(withExplain = true, withExplainGlobal = true, parameterized = true)
+  @SimpleTest()
+  public final void testThetaJoinKeyPairs() throws Exception {
+    runSimpleTests();
+  }
+}