You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/25 10:08:57 UTC

[05/15] tajo git commit: TAJO-1616: Implement TablespaceManager to load Tablespaces.

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index b6a4707..3d2d857 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -33,12 +33,13 @@ import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.hbase.*;
 import org.apache.tajo.util.Bytes;
@@ -49,7 +50,9 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import java.io.IOException;
 import java.net.InetAddress;
+import java.net.URI;
 import java.sql.ResultSet;
 import java.text.DecimalFormat;
 import java.util.*;
@@ -61,10 +64,11 @@ import static org.junit.Assert.assertEquals;
 public class TestHBaseTable extends QueryTestCaseBase {
   private static final Log LOG = LogFactory.getLog(TestHBaseTable.class);
 
+  private static String tableSpaceUri;
   private static String hostName,zkPort;
 
   @BeforeClass
-  public static void beforeClass() {
+  public static void beforeClass() throws IOException {
     try {
       testingCluster.getHBaseUtil().startHBaseCluster();
       hostName = InetAddress.getLocalHost().getHostName();
@@ -74,6 +78,11 @@ public class TestHBaseTable extends QueryTestCaseBase {
     } catch (Exception e) {
       e.printStackTrace();
     }
+
+    tableSpaceUri = "hbase:zk://" + hostName + ":" + zkPort;
+    HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri));
+    hBaseTablespace.init(new TajoConf(testingCluster.getHBaseUtil().getConf()));
+    TableSpaceManager.addTableSpaceForTest(hBaseTablespace);
   }
 
   @AfterClass
@@ -88,8 +97,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
   @Test
   public void testVerifyCreateHBaseTableRequiredMeta() throws Exception {
     try {
-      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " +
-          "USING hbase").close();
+      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) TABLESPACE cluster1 USING hbase").close();
 
       fail("hbase table must have 'table' meta");
     } catch (Exception e) {
@@ -97,7 +105,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
     }
 
     try {
-      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " +
+      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) TABLESPACE cluster1 " +
           "USING hbase " +
           "WITH ('table'='hbase_table')").close();
 
@@ -109,10 +117,9 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testCreateHBaseTable() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " +
-        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:a,col3:,col2:b', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+    executeString(
+        "CREATE TABLE hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:a,col3:,col2:b')").close();
 
     assertTableExists("hbase_mapped_table1");
 
@@ -138,11 +145,12 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testCreateNotExistsExternalHBaseTable() throws Exception {
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:a,col3:,col2:b') " +
+            "LOCATION '%s/external_hbase_table'", tableSpaceUri);
     try {
-      executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " +
-          "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:a,col3:,col2:b', " +
-          "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-          "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+      executeString(sql).close();
       fail("External table should be a existed table.");
     } catch (Exception e) {
       assertTrue(e.getMessage().indexOf("External table should be a existed table.") >= 0);
@@ -153,10 +161,8 @@ public class TestHBaseTable extends QueryTestCaseBase {
   public void testCreateRowFieldWithNonText() throws Exception {
     try {
       executeString("CREATE TABLE hbase_mapped_table2 (rk1 int4, rk2 text, col3 text, col4 text) " +
-          "USING hbase WITH ('table'='hbase_table', 'columns'='0:key#b,1:key,col3:,col2:b', " +
-          "'hbase.rowkey.delimiter'='_', " +
-          "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-          "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+          "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'='0:key#b,1:key,col3:,col2:b', " +
+          "'hbase.rowkey.delimiter'='_')").close();
       fail("Key field type should be TEXT type");
     } catch (Exception e) {
       assertTrue(e.getMessage().indexOf("Key field type should be TEXT type") >= 0);
@@ -171,10 +177,11 @@ public class TestHBaseTable extends QueryTestCaseBase {
     hTableDesc.addFamily(new HColumnDescriptor("col3"));
     testingCluster.getHBaseUtil().createTable(hTableDesc);
 
-    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
-        "USING hbase WITH ('table'='external_hbase_table_not_purge', 'columns'=':key,col1:a,col2:,col3:b', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table_not_purge', 'columns'=':key,col1:a,col2:,col3:b') " +
+        "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    executeString(sql).close();
 
     assertTableExists("external_hbase_mapped_table");
 
@@ -198,15 +205,16 @@ public class TestHBaseTable extends QueryTestCaseBase {
     hTableDesc.addFamily(new HColumnDescriptor("col3"));
     testingCluster.getHBaseUtil().createTable(hTableDesc);
 
-    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
-        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b') " +
+        "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    executeString(sql).close();
 
     assertTableExists("external_hbase_mapped_table");
 
-    HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
-        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    HConnection hconn = space.getConnection();
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
     try {
@@ -237,15 +245,16 @@ public class TestHBaseTable extends QueryTestCaseBase {
     hTableDesc.addFamily(new HColumnDescriptor("col3"));
     testingCluster.getHBaseUtil().createTable(hTableDesc);
 
-    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk int8, col1 text, col2 text, col3 int4)\n " +
-        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key#b,col1:a,col2:,col3:b#b', \n" +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "', \n" +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk int8, col1 text, col2 text, col3 int4)\n " +
+        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key#b,col1:a,col2:,col3:b#b') " +
+        "LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    executeString(sql).close();
 
     assertTableExists("external_hbase_mapped_table");
 
-    HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
-        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    HConnection hconn = space.getConnection();
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
     try {
@@ -289,16 +298,16 @@ public class TestHBaseTable extends QueryTestCaseBase {
     hTableDesc.addFamily(new HColumnDescriptor("col3"));
     testingCluster.getHBaseUtil().createTable(hTableDesc);
 
-    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, col2_key text, col2_value text, col3 text) " +
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, col2_key text, col2_value text, col3 text) " +
         "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " +
-        "'hbase.rowkey.delimiter'='_', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.rowkey.delimiter'='_') LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    executeString(sql).close();
 
     assertTableExists("external_hbase_mapped_table");
 
-    HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
-        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    HConnection hconn = space.getConnection();
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
     try {
@@ -326,16 +335,16 @@ public class TestHBaseTable extends QueryTestCaseBase {
     hTableDesc.addFamily(new HColumnDescriptor("col3"));
     testingCluster.getHBaseUtil().createTable(hTableDesc);
 
-    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, rk2 text, col3 text) " +
+    String sql = String.format(
+        "CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, rk2 text, col3 text) " +
         "USING hbase WITH ('table'='external_hbase_table', 'columns'='0:key,1:key,col3:a', " +
-        "'hbase.rowkey.delimiter'='_', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.rowkey.delimiter'='_') LOCATION '%s/external_hbase_table'", tableSpaceUri);
+    executeString(sql).close();
 
     assertTableExists("external_hbase_mapped_table");
 
-    HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
-        .getConnection(testingCluster.getHBaseUtil().getConf());
+    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    HConnection hconn = space.getConnection();
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
     try {
@@ -356,11 +365,11 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testIndexPredication() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
-        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " +
-        "'hbase.split.rowkeys'='010,040,060,080', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+    String sql =
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " +
+        "'hbase.split.rowkeys'='010,040,060,080') ";
+    executeString(sql).close();
 
 
     assertTableExists("hbase_mapped_table");
@@ -404,12 +413,11 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testCompositeRowIndexPredication() throws Exception {
+
     executeString("CREATE TABLE hbase_mapped_table (rk text, rk2 text, col1 text, col2 text, col3 text) " +
-        "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
         "'hbase.split.rowkeys'='010,040,060,080', " +
-        "'hbase.rowkey.delimiter'='_', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.rowkey.delimiter'='_')").close();
 
     assertTableExists("hbase_mapped_table");
     HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
@@ -469,7 +477,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
     EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
         new ConstEval(new TextDatum("021")));
     scanNode.setQual(evalNodeEq);
-    Tablespace tablespace = TableSpaceManager.getStorageManager(conf, "HBASE");
+    Tablespace tablespace = TableSpaceManager.getByName("cluster1").get();
     List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
     assertEquals(1, fragments.size());
     assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
@@ -559,10 +567,8 @@ public class TestHBaseTable extends QueryTestCaseBase {
   @Test
   public void testNonForwardQuery() throws Exception {
     executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int) " +
-        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:#b', " +
-        "'hbase.split.rowkeys'='010,040,060,080', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:#b', " +
+        "'hbase.split.rowkeys'='010,040,060,080')").close();
 
     assertTableExists("hbase_mapped_table");
     HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
@@ -599,10 +605,8 @@ public class TestHBaseTable extends QueryTestCaseBase {
   @Test
   public void testJoin() throws Exception {
     executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int8) " +
-        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
-        "'hbase.split.rowkeys'='010,040,060,080', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
+        "'hbase.split.rowkeys'='010,040,060,080')").close();
 
     assertTableExists("hbase_mapped_table");
     HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
@@ -641,9 +645,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
   @Test
   public void testInsertInto() throws Exception {
     executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " +
-        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close();
 
     assertTableExists("hbase_mapped_table");
     TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
@@ -682,11 +684,9 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testInsertIntoMultiRegion() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
-        "'hbase.split.rowkeys'='010,040,060,080', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.split.rowkeys'='010,040,060,080')").close();
 
     assertTableExists("hbase_mapped_table");
     TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
@@ -740,11 +740,9 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testInsertIntoMultiRegion2() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
-        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9')").close();
 
     assertTableExists("hbase_mapped_table");
     TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
@@ -799,11 +797,10 @@ public class TestHBaseTable extends QueryTestCaseBase {
   public void testInsertIntoMultiRegionWithSplitFile() throws Exception {
     String splitFilePath = currentDatasetPath + "/splits.data";
 
-    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
-        "'hbase.split.rowkeys.file'='" + splitFilePath + "', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.split.rowkeys.file'='" + splitFilePath + "')").close();
 
     assertTableExists("hbase_mapped_table");
     TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
@@ -857,12 +854,11 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testInsertIntoMultiRegionMultiRowFields() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text) " +
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a', " +
         "'hbase.split.rowkeys'='001,002,003,004,005,006,007,008,009', " +
-        "'hbase.rowkey.delimiter'='_', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.rowkey.delimiter'='_')").close();
 
     assertTableExists("hbase_mapped_table");
     TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
@@ -917,11 +913,9 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testInsertIntoBinaryMultiRegion() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table (rk int4, col1 text) " +
+    executeString("CREATE TABLE hbase_mapped_table (rk int4, col1 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'=':key#b,col1:a', " +
-        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9')").close();
 
     assertTableExists("hbase_mapped_table");
     TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
@@ -974,11 +968,10 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testInsertIntoColumnKeyValue() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table (rk text, col2_key text, col2_value text, col3 text) " +
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col2_key text, col2_value text, col3 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " +
-        "'hbase.rowkey.delimiter'='_', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.rowkey.delimiter'='_')").close();
 
     assertTableExists("hbase_mapped_table");
     TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
@@ -1065,11 +1058,10 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testInsertIntoDifferentType() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
-        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9')").close();
 
     assertTableExists("hbase_mapped_table");
 
@@ -1102,11 +1094,10 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testInsertIntoRowField() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text, col2 text, col3 text) " +
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text, col2 text, col3 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
-        "'hbase.rowkey.delimiter'='_', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.rowkey.delimiter'='_')").close();
 
 
     assertTableExists("hbase_mapped_table");
@@ -1145,7 +1136,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
   }
 
   @Test
-  public void testCATS() throws Exception {
+  public void testCTAS() throws Exception {
     // create test table
     KeyValueSet tableOptions = new KeyValueSet();
     tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
@@ -1162,13 +1153,11 @@ public class TestHBaseTable extends QueryTestCaseBase {
     TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
         schema, tableOptions, datas.toArray(new String[]{}), 2);
 
-    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
-        "'hbase.split.rowkeys'='010,040,060,080', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')" +
-        " as " +
-        "select id, name from base_table"
+        "'hbase.split.rowkeys'='010,040,060,080') as" +
+        " select id, name from base_table"
     ).close();
 
     assertTableExists("hbase_mapped_table");
@@ -1199,15 +1188,21 @@ public class TestHBaseTable extends QueryTestCaseBase {
       if (htable != null) {
         htable.close();
       }
+
+      // TODO - rollback should support its corresponding hbase table
+      HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
+      if (hAdmin.tableExists("hbase_table")) {
+        hAdmin.disableTable("hbase_table");
+        hAdmin.deleteTable("hbase_table");
+      }
     }
   }
 
   @Test
   public void testInsertIntoUsingPut() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " +
-        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) TABLESPACE cluster1 " +
+        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close();
 
     assertTableExists("hbase_mapped_table");
     TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
@@ -1219,8 +1214,10 @@ public class TestHBaseTable extends QueryTestCaseBase {
     HTable htable = null;
     ResultScanner scanner = null;
     try {
-      executeString("insert into hbase_mapped_table " +
-          "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close();
+      executeString(
+          "insert into hbase_mapped_table " +
+          "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem"
+      ).close();
 
       htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 
@@ -1253,11 +1250,10 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
   @Test
   public void testInsertIntoLocation() throws Exception {
-    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text) " +
+    executeString(
+        "CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:', " +
-        "'hbase.split.rowkeys'='010,040,060,080', " +
-        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
+        "'hbase.split.rowkeys'='010,040,060,080')").close();
 
     assertTableExists("hbase_mapped_table");
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index b4334f6..7a671d8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -277,7 +277,7 @@ public class TestInsertQuery extends QueryTestCaseBase {
       TableDesc tableDesc = testingCluster.getMaster().getCatalog().getTableDesc(getCurrentDatabase(), tableName);
       assertNotNull(tableDesc);
 
-      Path path = new Path(tableDesc.getPath());
+      Path path = new Path(tableDesc.getUri());
       FileSystem fs = path.getFileSystem(testingCluster.getConfiguration());
 
       FileStatus[] files = fs.listStatus(path);
@@ -484,10 +484,10 @@ public class TestInsertQuery extends QueryTestCaseBase {
     }
 
     FileSystem fs = FileSystem.get(testingCluster.getConfiguration());
-    assertTrue(fs.exists(new Path(desc.getPath())));
+    assertTrue(fs.exists(new Path(desc.getUri())));
     CompressionCodecFactory factory = new CompressionCodecFactory(testingCluster.getConfiguration());
 
-    for (FileStatus file : fs.listStatus(new Path(desc.getPath()))) {
+    for (FileStatus file : fs.listStatus(new Path(desc.getUri()))) {
       CompressionCodec codec = factory.getCodec(file.getPath());
       assertTrue(codec instanceof DeflateCodec);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index a65c165..1478690 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -262,9 +262,9 @@ public class TestJoinQuery extends QueryTestCaseBase {
           appender.flush();
           appender.close();
         }
-        Path dataPath = new Path(table.getPath().toString(), fileIndex + ".csv");
+        Path dataPath = new Path(table.getUri().toString(), fileIndex + ".csv");
         fileIndex++;
-        appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
+        appender = (((FileTablespace)TableSpaceManager.getLocalFs()))
             .getAppender(tableMeta, schema, dataPath);
         appender.init();
       }
@@ -279,7 +279,7 @@ public class TestJoinQuery extends QueryTestCaseBase {
   protected static void addEmptyDataFile(String tableName, boolean isPartitioned) throws Exception {
     TableDesc table = client.getTableDesc(tableName);
 
-    Path path = new Path(table.getPath());
+    Path path = new Path(table.getUri());
     FileSystem fs = path.getFileSystem(conf);
     if (isPartitioned) {
       List<Path> partitionPathList = getPartitionPathList(fs, path);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
index 0d98b91..397b9ef 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -236,7 +236,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
   private void assertPartitionDirectories(TableDesc desc) throws IOException {
     FileSystem fs = FileSystem.get(conf);
-    Path path = new Path(desc.getPath());
+    Path path = new Path(desc.getUri());
     assertTrue(fs.isDirectory(path));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
@@ -361,7 +361,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res.close();
 
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
-    Path path = new Path(desc.getPath());
+    Path path = new Path(desc.getUri());
 
     FileSystem fs = FileSystem.get(conf);
     assertTrue(fs.isDirectory(path));
@@ -434,7 +434,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res.close();
 
     TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
-    Path path = new Path(desc.getPath());
+    Path path = new Path(desc.getUri());
 
     FileSystem fs = FileSystem.get(conf);
     assertTrue(fs.isDirectory(path));
@@ -486,7 +486,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
     res.close();
 
     desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
-    path = new Path(desc.getPath());
+    path = new Path(desc.getUri());
 
     assertTrue(fs.isDirectory(path));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
@@ -515,7 +515,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
         "R\n" +
         "R\n";
 
-    String tableData = getTableFileContents(new Path(desc.getPath()));
+    String tableData = getTableFileContents(new Path(desc.getUri()));
     assertEquals(expected, tableData);
 
     res = executeString("select * from " + tableName + " where col2 = 2");
@@ -589,7 +589,7 @@ public class TestTablePartitions extends QueryTestCaseBase {
 
     desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName);
 
-    ContentSummary summary = fs.getContentSummary(new Path(desc.getPath()));
+    ContentSummary summary = fs.getContentSummary(new Path(desc.getUri()));
 
     assertEquals(summary.getDirectoryCount(), 1L);
     assertEquals(summary.getFileCount(), 0L);
@@ -627,10 +627,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
 
     FileSystem fs = FileSystem.get(conf);
-    assertTrue(fs.exists(new Path(desc.getPath())));
+    assertTrue(fs.exists(new Path(desc.getUri())));
     CompressionCodecFactory factory = new CompressionCodecFactory(conf);
 
-    Path path = new Path(desc.getPath());
+    Path path = new Path(desc.getUri());
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
@@ -676,10 +676,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
 
     FileSystem fs = FileSystem.get(conf);
-    assertTrue(fs.exists(new Path(desc.getPath())));
+    assertTrue(fs.exists(new Path(desc.getUri())));
     CompressionCodecFactory factory = new CompressionCodecFactory(conf);
 
-    Path path = new Path(desc.getPath());
+    Path path = new Path(desc.getUri());
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
@@ -733,10 +733,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
 
     FileSystem fs = FileSystem.get(conf);
-    assertTrue(fs.exists(new Path(desc.getPath())));
+    assertTrue(fs.exists(new Path(desc.getUri())));
     CompressionCodecFactory factory = new CompressionCodecFactory(conf);
 
-    Path path = new Path(desc.getPath());
+    Path path = new Path(desc.getUri());
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
@@ -828,10 +828,10 @@ public class TestTablePartitions extends QueryTestCaseBase {
     }
 
     FileSystem fs = FileSystem.get(conf);
-    assertTrue(fs.exists(new Path(desc.getPath())));
+    assertTrue(fs.exists(new Path(desc.getUri())));
     CompressionCodecFactory factory = new CompressionCodecFactory(conf);
 
-    Path path = new Path(desc.getPath());
+    Path path = new Path(desc.getUri());
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
     assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index f6fd88f..fc25c27 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -66,7 +66,7 @@ public class TestResultSet {
   public static void setup() throws Exception {
     util = TpchTestBase.getInstance().getTestingCluster();
     conf = util.getConfiguration();
-    sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
+    sm = TableSpaceManager.getDefault();
 
     scoreSchema = new Schema();
     scoreSchema.addColumn("deptname", Type.TEXT);
@@ -74,7 +74,7 @@ public class TestResultSet {
     scoreMeta = CatalogUtil.newTableMeta("CSV");
     TableStats stats = new TableStats();
 
-    Path p = sm.getTablePath("score");
+    Path p = new Path(sm.getTableUri("default", "score"));
     sm.getFileSystem().mkdirs(p);
     Appender appender = sm.getAppender(scoreMeta, scoreSchema, new Path(p, "score"));
     RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(scoreSchema);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 0f90722..48966bc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -27,14 +27,13 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.plan.LogicalOptimizer;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.Tablespace;
+import org.apache.tajo.plan.LogicalOptimizer;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.AfterClass;
@@ -80,10 +79,9 @@ public class TestExecutionBlockCursor {
     }
 
     analyzer = new SQLAnalyzer();
-    logicalPlanner = new LogicalPlanner(catalog);
+    logicalPlanner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
     optimizer = new LogicalOptimizer(conf);
 
-    Tablespace sm  = TableSpaceManager.getFileStorageManager(conf);
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index 0cec3da..edddc5a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -45,6 +45,7 @@ import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.history.HistoryReader;
 import org.apache.tajo.util.history.HistoryWriter;
@@ -104,7 +105,7 @@ public class TestKillQuery {
     Session session = LocalTajoTestingUtility.createDummySession();
     CatalogService catalog = cluster.getMaster().getCatalog();
 
-    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
     LogicalOptimizer optimizer = new LogicalOptimizer(conf);
     Expr expr =  analyzer.parse(queryStr);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -168,7 +169,7 @@ public class TestKillQuery {
     Session session = LocalTajoTestingUtility.createDummySession();
     CatalogService catalog = cluster.getMaster().getCatalog();
 
-    LogicalPlanner planner = new LogicalPlanner(catalog);
+    LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
     LogicalOptimizer optimizer = new LogicalOptimizer(conf);
     Expr expr =  analyzer.parse(queryStr);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
index b19a2e5..863c7b5 100644
--- a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -67,8 +67,7 @@ public class TestRowFile {
 
     TableMeta meta = CatalogUtil.newTableMeta("ROWFILE");
 
-    FileTablespace sm =
-        (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = (FileTablespace) TableSpaceManager.get(cluster.getDefaultFileSystem().getUri()).get();
 
     Path tablePath = new Path("/test");
     Path metaPath = new Path(tablePath, ".meta");
@@ -109,7 +108,7 @@ public class TestRowFile {
 
     int tupleCnt = 0;
     start = System.currentTimeMillis();
-    Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner = sm.getScanner(meta, schema, fragment);
     scanner.init();
     while ((tuple=scanner.next()) != null) {
       tupleCnt++;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/resources/results/TestHBaseTable/testCATS.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testCATS.result b/tajo-core/src/test/resources/results/TestHBaseTable/testCATS.result
deleted file mode 100644
index 72013f2..0000000
--- a/tajo-core/src/test/resources/results/TestHBaseTable/testCATS.result
+++ /dev/null
@@ -1,100 +0,0 @@
-000, value0
-001, value1
-002, value2
-003, value3
-004, value4
-005, value5
-006, value6
-007, value7
-008, value8
-009, value9
-010, value10
-011, value11
-012, value12
-013, value13
-014, value14
-015, value15
-016, value16
-017, value17
-018, value18
-019, value19
-020, value20
-021, value21
-022, value22
-023, value23
-024, value24
-025, value25
-026, value26
-027, value27
-028, value28
-029, value29
-030, value30
-031, value31
-032, value32
-033, value33
-034, value34
-035, value35
-036, value36
-037, value37
-038, value38
-039, value39
-040, value40
-041, value41
-042, value42
-043, value43
-044, value44
-045, value45
-046, value46
-047, value47
-048, value48
-049, value49
-050, value50
-051, value51
-052, value52
-053, value53
-054, value54
-055, value55
-056, value56
-057, value57
-058, value58
-059, value59
-060, value60
-061, value61
-062, value62
-063, value63
-064, value64
-065, value65
-066, value66
-067, value67
-068, value68
-069, value69
-070, value70
-071, value71
-072, value72
-073, value73
-074, value74
-075, value75
-076, value76
-077, value77
-078, value78
-079, value79
-080, value80
-081, value81
-082, value82
-083, value83
-084, value84
-085, value85
-086, value86
-087, value87
-088, value88
-089, value89
-090, value90
-091, value91
-092, value92
-093, value93
-094, value94
-095, value95
-096, value96
-097, value97
-098, value98
-099, value99

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/resources/results/TestHBaseTable/testCTAS.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testCTAS.result b/tajo-core/src/test/resources/results/TestHBaseTable/testCTAS.result
new file mode 100644
index 0000000..72013f2
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestHBaseTable/testCTAS.result
@@ -0,0 +1,100 @@
+000, value0
+001, value1
+002, value2
+003, value3
+004, value4
+005, value5
+006, value6
+007, value7
+008, value8
+009, value9
+010, value10
+011, value11
+012, value12
+013, value13
+014, value14
+015, value15
+016, value16
+017, value17
+018, value18
+019, value19
+020, value20
+021, value21
+022, value22
+023, value23
+024, value24
+025, value25
+026, value26
+027, value27
+028, value28
+029, value29
+030, value30
+031, value31
+032, value32
+033, value33
+034, value34
+035, value35
+036, value36
+037, value37
+038, value38
+039, value39
+040, value40
+041, value41
+042, value42
+043, value43
+044, value44
+045, value45
+046, value46
+047, value47
+048, value48
+049, value49
+050, value50
+051, value51
+052, value52
+053, value53
+054, value54
+055, value55
+056, value56
+057, value57
+058, value58
+059, value59
+060, value60
+061, value61
+062, value62
+063, value63
+064, value64
+065, value65
+066, value66
+067, value67
+068, value68
+069, value69
+070, value70
+071, value71
+072, value72
+073, value73
+074, value74
+075, value75
+076, value76
+077, value77
+078, value78
+079, value79
+080, value80
+081, value81
+082, value82
+083, value83
+084, value84
+085, value85
+086, value86
+087, value87
+088, value88
+089, value89
+090, value90
+091, value91
+092, value92
+093, value93
+094, value94
+095, value95
+096, value96
+097, value97
+098, value98
+099, value99

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/resources/results/TestHBaseTable/testInsertIntoUsingPut.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testInsertIntoUsingPut.result b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertIntoUsingPut.result
index 1c22960..e0c97ef 100644
--- a/tajo-core/src/test/resources/results/TestHBaseTable/testInsertIntoUsingPut.result
+++ b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertIntoUsingPut.result
@@ -1,3 +1,3 @@
-1, 1996-04-12, {"": "N"}, 7311
+1, 1996-03-13, {"": "N"}, 7706
 2, 1997-01-28, {"": "N"}, 1191
-3, 1993-11-09, {"": "R"}, 6540
+3, 1994-02-02, {"": "R"}, 1798

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/resources/results/TestTajoCli/testDescTable.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testDescTable.result b/tajo-core/src/test/resources/results/TestTajoCli/testDescTable.result
index ae2af45..d3800ab 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testDescTable.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testDescTable.result
@@ -1,7 +1,7 @@
 OK
 
 table name: default.TEST_DESC_TABLE
-table path: ${table.path}
+table uri: ${table.path}
 store type: CSV
 number of rows: 0
 volume: 0 B
@@ -15,7 +15,7 @@ col2	INT4
 
 
 table name: default.TEST_DESC_TABLE
-table path: ${table.path}
+table uri: ${table.path}
 store type: CSV
 number of rows: 0
 volume: 0 B
@@ -24,6 +24,4 @@ Options:
 
 schema: 
 col1	INT4
-col2	INT4
-
-
+col2	INT4
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/resources/results/TestTajoCli/testDescTableForNestedSchema.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testDescTableForNestedSchema.result b/tajo-core/src/test/resources/results/TestTajoCli/testDescTableForNestedSchema.result
index 83f360b..7eff4af 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testDescTableForNestedSchema.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testDescTableForNestedSchema.result
@@ -1,7 +1,7 @@
 OK
 
 table name: default.TEST_DESC_TABLE_NESTED
-table path: ${table.path}
+table uri: ${table.path}
 store type: CSV
 number of rows: 0
 volume: 0 B
@@ -16,7 +16,7 @@ col3	RECORD (col4 RECORD (col5 TEXT))
 
 
 table name: default.TEST_DESC_TABLE_NESTED
-table path: ${table.path}
+table uri: ${table.path}
 store type: CSV
 number of rows: 0
 volume: 0 B

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-dist/src/main/conf/storage-site.json.template
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/conf/storage-site.json.template b/tajo-dist/src/main/conf/storage-site.json.template
new file mode 100644
index 0000000..2d7b19d
--- /dev/null
+++ b/tajo-dist/src/main/conf/storage-site.json.template
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* HBase Storage Plugin and Tablespace Example */
+/*
+{
+  "spaces": {
+    "hbase-cluster1": {
+      "uri": "hbase://quorum1:port,quorum2:port/"
+    }
+  },
+
+  "storages": {
+    "hbase": {
+      "handler": "org.apache.tajo.storage.hbase.HBaseTablespace",
+      "default-format": "hbase"
+    }
+  }
+}
+*/
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-docs/src/main/sphinx/index.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/index.rst b/tajo-docs/src/main/sphinx/index.rst
index 730bed4..ec65fd3 100644
--- a/tajo-docs/src/main/sphinx/index.rst
+++ b/tajo-docs/src/main/sphinx/index.rst
@@ -37,6 +37,7 @@ Table of Contents:
    functions
    table_management
    table_partitioning
+   storage_plugin
    index_overview
    backup_and_restore
    hive_integration

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-docs/src/main/sphinx/storage_plugin.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/storage_plugin.rst b/tajo-docs/src/main/sphinx/storage_plugin.rst
new file mode 100644
index 0000000..d9c6838
--- /dev/null
+++ b/tajo-docs/src/main/sphinx/storage_plugin.rst
@@ -0,0 +1,47 @@
+*************************************
+Storage Plugin
+*************************************
+
+Overview
+========
+
+Tajo supports various storage systems, such as HDFS, Amazon S3, Openstack Swift, and HBase. Also, we have a plan to support RDBMS storages like Oracle, MySQL, PostgreSQL. Tajo already embeds HDFS, S3, Openstack, and HBase, and also Tajo allows users to register custom storages and data formats to Tajo cluster instances. This section describes how you register custom storages and data types.
+
+Register custom storage
+=======================
+
+First of all, your storage implementation should be packed as a jar file. Then, please copy the jar file into ``tajo/extlib`` directory. Next, you should copy ``conf/storage-site.json.template`` into ``conf/storage-site.json`` and modify the file like the below.
+
+Configuration
+=============
+
+Tajo has a default configuration for builtin storages, such as HDFS, local file system, and Amazon S3. it also allows users to add custom storage plugins
+
+``conf/storage-site.json`` file has the following struct:
+
+.. code-block:: json
+
+  {
+    "storages": {
+      "${scheme}": {
+        "handler": "${class name}"
+      }
+    }
+  }
+
+Each storage instance (i.e., :doc:`/table_management/tablespaces`) is identified by an URI. The scheme of URI plays a role to identify storage type. For example, ``hdfs://`` is used for Hdfs storage, ``jdbc://`` is used for JDBC-based storage, and ``hbase://`` is used for HBase storage. 
+
+You should substitute a scheme name without ``://`` for ``${scheme}``.
+
+See an example for HBase storage.
+
+.. code-block:: json
+
+  {
+    "storages": {
+      "hbase": {
+        "handler": "org.apache.tajo.storage.hbase.HBaseTablespace",
+        "default-format": "hbase"
+      }
+    }
+  }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-docs/src/main/sphinx/table_management.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/table_management.rst b/tajo-docs/src/main/sphinx/table_management.rst
index 2b21ddc..5a4693e 100644
--- a/tajo-docs/src/main/sphinx/table_management.rst
+++ b/tajo-docs/src/main/sphinx/table_management.rst
@@ -8,5 +8,6 @@ In Tajo, a table is a logical view of one data sources. Logically, one table con
     :maxdepth: 1
 
     table_management/table_overview
+    table_management/tablespaces
     table_management/file_formats
     table_management/compression
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-docs/src/main/sphinx/table_management/table_overview.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/table_management/table_overview.rst b/tajo-docs/src/main/sphinx/table_management/table_overview.rst
index b63fb69..5818106 100644
--- a/tajo-docs/src/main/sphinx/table_management/table_overview.rst
+++ b/tajo-docs/src/main/sphinx/table_management/table_overview.rst
@@ -5,6 +5,13 @@ Overview of Tajo Tables
 Overview
 ========
 
+Tablespaces
+===========
+
+Tablespaces is a physical location where files or data objects representing data rows can be stored. Once defined, a tablespace can be referred to by a name when creating a database or a table. Especially, it is very useful when a Tajo cluster instance should use heterogeneous storage systems such as HDFS, MySQL, and Oracle because each tablespace can be specified to use a different storage system. 
+
+Please refer to :doc:`/table_management/tablespaces` if you want to know more information about tablespaces.
+
 Managed Table
 ================
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-docs/src/main/sphinx/table_management/tablespaces.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/table_management/tablespaces.rst b/tajo-docs/src/main/sphinx/table_management/tablespaces.rst
new file mode 100644
index 0000000..964491c
--- /dev/null
+++ b/tajo-docs/src/main/sphinx/table_management/tablespaces.rst
@@ -0,0 +1,45 @@
+*************************************
+Tablespaces
+*************************************
+
+Tablespaces in Tajo allow users to define locations in the storage system where the files or data objects representing database objects can be stored. Once defined, a tablespace can be referred to by name when creating a database or a table. Especially, it is very useful when a Tajo cluster instance should use heterogeneous storage systems such as HDFS, MySQL, and Oracle.
+
+Configuration
+=============
+
+By default, Tajo use in ``${tajo.rootdir}/warehouse`` in ``conf/tajo-site.xml`` as a default tablespace. It also allows users to register additional tablespaces. 
+
+``conf/storage-site.json`` file.
+
+The configuration file has the following struct:
+
+.. code-block:: json
+
+  {
+    "spaces": {
+      "${table_space_name}": {
+        "uri": "hbase://quorum1:port,quorum2:port/"
+      }
+    }
+  }
+
+The following is an example for two tablespaces for hbase and hdfs:
+
+.. code-block:: json
+
+  {
+    "spaces": {
+      "hbase-cluster1": {
+        "uri": "hbase://quorum1:port,quorum2:port/"
+      },
+
+      "ssd": {
+        "uri": "hdfs://host:port/data/ssd"
+      }
+    }
+  }
+
+
+.. note::
+
+  Also, each tablespace can use different storage type. Please see :doc:`/storage_plugin` if you want to know more information about it.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index 5571cdf..a2480c9 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -30,13 +30,13 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.OverridableConf;
+import org.apache.tajo.QueryVars;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.algebra.WindowSpec;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.plan.LogicalPlan.QueryBlock;
@@ -50,11 +50,13 @@ import org.apache.tajo.plan.util.ExprFinder;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.plan.verifier.VerifyException;
+import org.apache.tajo.storage.StorageService;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.TUtil;
 
+import java.net.URI;
 import java.util.*;
 
 import static org.apache.tajo.algebra.CreateTable.PartitionType;
@@ -67,13 +69,17 @@ import static org.apache.tajo.plan.LogicalPlan.BlockType;
 public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContext, LogicalNode> {
   private static Log LOG = LogFactory.getLog(LogicalPlanner.class);
   private final CatalogService catalog;
+  private final StorageService storage;
+
   private final LogicalPlanPreprocessor preprocessor;
   private final EvalTreeOptimizer evalOptimizer;
   private final ExprAnnotator exprAnnotator;
   private final ExprNormalizer normalizer;
 
-  public LogicalPlanner(CatalogService catalog) {
+  public LogicalPlanner(CatalogService catalog, StorageService storage) {
     this.catalog = catalog;
+    this.storage = storage;
+
     this.exprAnnotator = new ExprAnnotator(catalog);
     this.preprocessor = new LogicalPlanPreprocessor(catalog, exprAnnotator);
     this.normalizer = new ExprNormalizer();
@@ -1345,9 +1351,10 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   }
 
   private void updatePhysicalInfo(TableDesc desc) {
-    if (desc.getPath() != null && desc.getMeta().getStoreType() != "SYSTEM") {
+    if (desc.getUri() != null &&
+        desc.getMeta().getStoreType() != "SYSTEM" && PlannerUtil.isFileStorageType(desc.getMeta().getStoreType())) {
       try {
-        Path path = new Path(desc.getPath());
+        Path path = new Path(desc.getUri());
         FileSystem fs = path.getFileSystem(new Configuration());
         FileStatus status = fs.getFileStatus(path);
         if (desc.getStats() != null && (status.isDirectory() || status.isFile())) {
@@ -1689,7 +1696,13 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     insertNode.setInSchema(childSchema);
     insertNode.setOutSchema(childSchema);
     insertNode.setTableSchema(childSchema);
-    insertNode.setTargetLocation(new Path(expr.getLocation()));
+
+    // Rewrite
+    URI targetUri = URI.create(expr.getLocation());
+    if (targetUri.getScheme() == null) {
+      targetUri = URI.create(context.getQueryContext().get(QueryVars.DEFAULT_SPACE_ROOT_URI) + "/" + targetUri);
+    }
+    insertNode.setUri(targetUri);
 
     if (expr.hasStorageType()) {
       insertNode.setStorageType(expr.getStorageType());
@@ -1742,7 +1755,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     createTableNode.setExternal(parentTableDesc.isExternal());
     if(parentTableDesc.isExternal()) {
-      createTableNode.setPath(new Path(parentTableDesc.getPath()));
+      createTableNode.setUri(parentTableDesc.getUri());
     }
     return createTableNode;
   }
@@ -1762,8 +1775,15 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
           CatalogUtil.buildFQName(context.queryContext.get(SessionVars.CURRENT_DATABASE), expr.getTableName()));
     }
     // This is CREATE TABLE <tablename> LIKE <parentTable>
-    if(expr.getLikeParentTableName() != null)
+    if(expr.getLikeParentTableName() != null) {
       return handleCreateTableLike(context, expr, createTableNode);
+    }
+
+    if (expr.hasTableSpaceName()) {
+      createTableNode.setTableSpaceName(expr.getTableSpaceName());
+    }
+
+    createTableNode.setUri(getCreatedTableURI(context, expr));
 
     if (expr.hasStorageType()) { // If storage type (using clause) is specified
       createTableNode.setStorageType(expr.getStorageType());
@@ -1771,8 +1791,6 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       createTableNode.setStorageType("CSV");
     }
 
-
-
     // Set default storage properties to table
     KeyValueSet properties = CatalogUtil.newPhysicalProperties(createTableNode.getStorageType());
 
@@ -1789,8 +1807,6 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     createTableNode.setOptions(properties);
 
-
-
     if (expr.hasPartition()) {
       if (expr.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) {
         createTableNode.setPartitionMethod(getPartitionMethod(context, expr.getTableName(), expr.getPartitionMethod()));
@@ -1850,14 +1866,32 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
         createTableNode.setExternal(true);
       }
 
-      if (expr.hasLocation()) {
-        createTableNode.setPath(new Path(expr.getLocation()));
-      }
-
       return createTableNode;
     }
   }
 
+  /**
+   * Return a table uri to be created
+   *
+   * @param context PlanContext
+   * @param createTable An algebral expression for create table
+   * @return a Table uri to be created on a given table space
+   */
+  private URI getCreatedTableURI(PlanContext context, CreateTable createTable) {
+
+    if (createTable.hasLocation()) {
+      return URI.create(createTable.getLocation());
+    } else {
+
+      String tableName = createTable.getTableName();
+      String databaseName = CatalogUtil.isFQTableName(tableName) ?
+          CatalogUtil.extractQualifier(tableName) : context.queryContext.get(SessionVars.CURRENT_DATABASE);
+
+      return storage.getTableURI(
+          createTable.getTableSpaceName(), databaseName, CatalogUtil.extractSimpleName(tableName));
+    }
+  }
+
   private PartitionMethodDesc getPartitionMethod(PlanContext context,
                                                  String tableName,
                                                  CreateTable.PartitionMethodDescExpr expr) throws PlanningException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
index 0ab62d5..46ea458 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
@@ -20,15 +20,14 @@ package org.apache.tajo.plan.logical;
 
 import com.google.common.base.Objects;
 import com.google.gson.annotations.Expose;
-import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.plan.PlanString;
-import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
 
+import java.net.URI;
+
 public class CreateTableNode extends StoreTableNode implements Cloneable {
-  @Expose private Schema schema;
-  @Expose private Path path;
+  @Expose private String tableSpaceName;
   @Expose private boolean external;
   @Expose private boolean ifNotExists;
 
@@ -41,34 +40,26 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
     return child == null ? 0 : 1;
   }
 
-  public void setTableSchema(Schema schema) {
-    this.schema = schema;
-  }
-    
-  public Schema getTableSchema() {
-    return this.schema;
-  }
-
   public Schema getLogicalSchema() {
     if (hasPartition()) {
-      Schema logicalSchema = new Schema(schema);
+      Schema logicalSchema = new Schema(tableSchema);
       logicalSchema.addColumns(getPartitionMethod().getExpressionSchema());
       return logicalSchema;
     } else {
-      return schema;
+      return tableSchema;
     }
   }
 
-  public boolean hasPath() {
-    return this.path != null;
+  public boolean hasTableSpaceName() {
+    return tableSpaceName != null;
   }
 
-  public void setPath(Path path) {
-    this.path = path;
+  public String getTableSpaceName() {
+    return tableSpaceName;
   }
-  
-  public Path getPath() {
-    return this.path;
+
+  public void setTableSpaceName(String tableSpaceName) {
+    this.tableSpaceName = tableSpaceName;
   }
 
   public boolean isExternal() {
@@ -97,7 +88,7 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
   }
 
   public int hashCode() {
-    return super.hashCode() ^ Objects.hashCode(schema, path, external, ifNotExists) * 31;
+    return super.hashCode() ^ Objects.hashCode(tableSchema, uri, external, ifNotExists) * 31;
   }
   
   @Override
@@ -105,9 +96,8 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
     if (obj instanceof CreateTableNode) {
       CreateTableNode other = (CreateTableNode) obj;
       boolean eq = super.equals(other);
-      eq &= this.schema.equals(other.schema);
+      eq &= TUtil.checkEquals(tableSpaceName, other.tableSpaceName);
       eq &= this.external == other.external;
-      eq &= TUtil.checkEquals(path, other.path);
       eq &= ifNotExists == other.ifNotExists;;
       return eq;
     } else {
@@ -118,12 +108,8 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
   @Override
   public Object clone() throws CloneNotSupportedException {
     CreateTableNode createTableNode = (CreateTableNode) super.clone();
-    createTableNode.tableName = tableName;
-    createTableNode.schema = (Schema) schema.clone();
-    createTableNode.storageType = storageType;
+    createTableNode.tableSpaceName = tableSpaceName;
     createTableNode.external = external;
-    createTableNode.path = path != null ? new Path(path.toString()) : null;
-    createTableNode.options = (KeyValueSet) (options != null ? options.clone() : null);
     createTableNode.ifNotExists = ifNotExists;
     return createTableNode;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
index ee15951..cb3dcee 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
@@ -20,21 +20,18 @@ package org.apache.tajo.plan.logical;
 
 import com.google.gson.annotations.Expose;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.plan.PlanString;
 import org.apache.tajo.util.TUtil;
 
 public class InsertNode extends StoreTableNode implements Cloneable {
+  /** Overwrite or just insert */
   @Expose private boolean overwrite;
-  @Expose private Schema tableSchema;
-
   /** a target schema of a target table */
   @Expose private Schema targetSchema;
   /** a output schema of select clause */
   @Expose private Schema projectedSchema;
-  @Expose private Path path;
 
   public InsertNode(int pid) {
     super(pid, NodeType.INSERT);
@@ -47,8 +44,8 @@ public class InsertNode extends StoreTableNode implements Cloneable {
     } else {
       tableSchema = desc.getSchema();
     }
-    if (desc.getPath() != null) {
-      setPath(new Path(desc.getPath()));
+    if (desc.getUri() != null) {
+      setUri(desc.getUri());
     }
     setOptions(desc.getMeta().getOptions());
     setStorageType(desc.getMeta().getStoreType());
@@ -58,10 +55,6 @@ public class InsertNode extends StoreTableNode implements Cloneable {
     }
   }
 
-  public void setTargetLocation(Path path) {
-    this.path = path;
-  }
-
   public void setSubQuery(LogicalNode subQuery) {
     this.setChild(subQuery);
     this.setInSchema(subQuery.getOutSchema());
@@ -76,14 +69,6 @@ public class InsertNode extends StoreTableNode implements Cloneable {
     this.overwrite = overwrite;
   }
 
-  public Schema getTableSchema() {
-    return tableSchema;
-  }
-
-  public void setTableSchema(Schema tableSchema) {
-    this.tableSchema = tableSchema;
-  }
-
   public boolean hasTargetSchema() {
     return this.targetSchema != null;
   }
@@ -108,28 +93,12 @@ public class InsertNode extends StoreTableNode implements Cloneable {
     this.projectedSchema = projected;
   }
 
-  public boolean hasPath() {
-    return this.path != null;
-  }
-
-  public void setPath(Path path) {
-    this.path = path;
-  }
-  
-  public Path getPath() {
-    return this.path;
-  }
-
-  public boolean hasStorageType() {
-    return this.storageType != null;
-  }
-  
   @Override
   public int hashCode() {
     final int prime = 31;
     int result = 1;
     result = prime * result + (overwrite ? 1231 : 1237);
-    result = prime * result + ((path == null) ? 0 : path.hashCode());
+    result = prime * result + ((uri == null) ? 0 : uri.hashCode());
     result = prime * result + ((projectedSchema == null) ? 0 : projectedSchema.hashCode());
     result = prime * result + ((tableSchema == null) ? 0 : tableSchema.hashCode());
     result = prime * result + ((targetSchema == null) ? 0 : targetSchema.hashCode());
@@ -142,9 +111,8 @@ public class InsertNode extends StoreTableNode implements Cloneable {
       InsertNode other = (InsertNode) obj;
       boolean eq = super.equals(other);
       eq &= this.overwrite == other.overwrite;
-      eq &= TUtil.checkEquals(this.tableSchema, other.tableSchema);
       eq &= TUtil.checkEquals(this.targetSchema, other.targetSchema);
-      eq &= TUtil.checkEquals(path, other.path);
+      eq &= TUtil.checkEquals(this.projectedSchema, other.projectedSchema);
       return eq;
     } else {
       return false;
@@ -157,7 +125,8 @@ public class InsertNode extends StoreTableNode implements Cloneable {
     insertNode.overwrite = overwrite;
     insertNode.tableSchema = new Schema(tableSchema);
     insertNode.targetSchema = targetSchema != null ? new Schema(targetSchema) : null;
-    insertNode.path = path != null ? new Path(path.toString()) : null;
+    insertNode.projectedSchema = projectedSchema != null ? new Schema(projectedSchema) : null;
+    insertNode.uri = uri != null ? uri : null;
     return insertNode;
   }
   
@@ -166,8 +135,8 @@ public class InsertNode extends StoreTableNode implements Cloneable {
     if (hasTargetTable()) {
       sb.append(",table=").append(tableName);
     }
-    if (hasPath()) {
-      sb.append(", location=").append(path);
+    if (hasUri()) {
+      sb.append(", location=").append(uri);
     }
     sb.append(")");
     return sb.toString();
@@ -195,7 +164,7 @@ public class InsertNode extends StoreTableNode implements Cloneable {
         planString.addExplan(getTargetSchema().toString());
       }
     } else {
-      planString.addExplan("LOCATION " + path);
+      planString.addExplan("LOCATION " + uri);
     }
     return planString;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
index 6fd969a..a4bb94c 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java
@@ -63,7 +63,7 @@ public class PartitionedTableScanNode extends ScanNode {
     if (hasQual()) {
       sb.append(", filter=").append(qual);
     }
-    sb.append(", path=").append(getTableDesc().getPath()).append(")");
+    sb.append(", uri=").append(getTableDesc().getUri()).append(")");
 	  return sb.toString();
 	}
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
index a22f592..0ba988f 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
@@ -162,7 +162,7 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod
     if (hasQual()) {
       sb.append(", filter=").append(qual);
     }
-    sb.append(", path=").append(getTableDesc().getPath()).append(")");
+    sb.append(", path=").append(getTableDesc().getUri()).append(")");
     return sb.toString();
 	}
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/java/org/apache/tajo/plan/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/StoreTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/StoreTableNode.java
index 3a40f83..170e13c 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/StoreTableNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/StoreTableNode.java
@@ -20,12 +20,17 @@ package org.apache.tajo.plan.logical;
 
 import com.google.gson.annotations.Expose;
 
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.plan.PlanString;
 import org.apache.tajo.util.TUtil;
 
+import java.net.URI;
+
 public class StoreTableNode extends PersistentStoreNode implements Cloneable {
   @Expose protected String tableName;
+  @Expose protected URI uri;
+  @Expose protected Schema tableSchema;
   @Expose private PartitionMethodDesc partitionDesc;
 
   public StoreTableNode(int pid) {
@@ -57,6 +62,26 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
     return this.tableName;
   }
 
+  public boolean hasUri() {
+    return this.uri != null;
+  }
+
+  public void setUri(URI uri) {
+    this.uri = uri;
+  }
+
+  public URI getUri() {
+    return this.uri;
+  }
+
+  public void setTableSchema(Schema schema) {
+    this.tableSchema = schema;
+  }
+
+  public Schema getTableSchema() {
+    return this.tableSchema;
+  }
+
   public boolean hasPartition() {
     return this.partitionDesc != null;
   }
@@ -93,6 +118,8 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
       StoreTableNode other = (StoreTableNode) obj;
       boolean eq = super.equals(other);
       eq = eq && TUtil.checkEquals(this.tableName, other.tableName);
+      eq = eq && TUtil.checkEquals(uri, other.uri);
+      eq = tableSchema.equals(other.tableSchema);
       eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
       return eq;
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
index 4b9fd48..3b1f1a8 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -259,9 +259,9 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule {
 
     if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates
       return findFilteredPaths(queryContext, paritionValuesSchema,
-          indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getPath()));
+          indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getUri()));
     } else { // otherwise, we will get all partition paths.
-      return findFilteredPaths(queryContext, paritionValuesSchema, null, new Path(table.getPath()));
+      return findFilteredPaths(queryContext, paritionValuesSchema, null, new Path(table.getUri()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index 694e81c..c1d9f9a 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -37,6 +37,7 @@ import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
 
+import java.net.URI;
 import java.util.*;
 
 /**
@@ -477,10 +478,14 @@ public class LogicalNodeDeserializer {
       createTable.setPartitionMethod(new PartitionMethodDesc(storeTableNodeSpec.getPartitionMethod()));
     }
 
-    createTable.setTableSchema(convertSchema(createTableNodeSpec.getSchema()));
+    createTable.setTableSchema(convertSchema(storeTableNodeSpec.getTableSchema()));
+
+    if (createTableNodeSpec.hasTablespaceName()) {
+     createTable.setTableSpaceName(createTableNodeSpec.getTablespaceName());
+    }
     createTable.setExternal(createTableNodeSpec.getExternal());
-    if (createTableNodeSpec.getExternal() && createTableNodeSpec.hasPath()) {
-      createTable.setPath(new Path(createTableNodeSpec.getPath()));
+    if (createTableNodeSpec.getExternal() && storeTableNodeSpec.hasUri()) {
+      createTable.setUri(URI.create(storeTableNodeSpec.getUri()));
     }
     createTable.setIfNotExists(createTableNodeSpec.getIfNotExists());
 
@@ -512,16 +517,14 @@ public class LogicalNodeDeserializer {
     }
 
     insertNode.setOverwrite(insertNodeSpec.getOverwrite());
-    insertNode.setTableSchema(convertSchema(insertNodeSpec.getTableSchema()));
+    insertNode.setTableSchema(convertSchema(storeTableNodeSpec.getTableSchema()));
     if (insertNodeSpec.hasTargetSchema()) {
       insertNode.setTargetSchema(convertSchema(insertNodeSpec.getTargetSchema()));
     }
     if (insertNodeSpec.hasProjectedSchema()) {
       insertNode.setProjectedSchema(convertSchema(insertNodeSpec.getProjectedSchema()));
     }
-    if (insertNodeSpec.hasPath()) {
-      insertNode.setPath(new Path(insertNodeSpec.getPath()));
-    }
+    insertNode.setUri(URI.create(storeTableNodeSpec.getUri()));
 
     return insertNode;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
index 88d831e..6737756 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
@@ -491,11 +491,10 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
     PlanProto.StoreTableNodeSpec.Builder storeTableBuilder = buildStoreTableNodeSpec(node);
 
     PlanProto.CreateTableNodeSpec.Builder createTableBuilder = PlanProto.CreateTableNodeSpec.newBuilder();
-    createTableBuilder.setSchema(node.getTableSchema().getProto());
-    createTableBuilder.setExternal(node.isExternal());
-    if (node.isExternal() && node.hasPath()) {
-      createTableBuilder.setPath(node.getPath().toString());
+    if (node.hasTableSpaceName()) {
+      createTableBuilder.setTablespaceName(node.getTableSpaceName());
     }
+    createTableBuilder.setExternal(node.isExternal());
     createTableBuilder.setIfNotExists(node.isIfNotExists());
 
     PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node);
@@ -605,16 +604,13 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
 
     PlanProto.InsertNodeSpec.Builder insertNodeSpec = PlanProto.InsertNodeSpec.newBuilder();
     insertNodeSpec.setOverwrite(node.isOverwrite());
-    insertNodeSpec.setTableSchema(node.getTableSchema().getProto());
+
     if (node.hasProjectedSchema()) {
       insertNodeSpec.setProjectedSchema(node.getProjectedSchema().getProto());
     }
     if (node.hasTargetSchema()) {
       insertNodeSpec.setTargetSchema(node.getTargetSchema().getProto());
     }
-    if (node.hasPath()) {
-      insertNodeSpec.setPath(node.getPath().toString());
-    }
 
     PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node);
     nodeBuilder.setPersistentStore(persistentStoreBuilder);
@@ -641,12 +637,19 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
 
   private static PlanProto.StoreTableNodeSpec.Builder buildStoreTableNodeSpec(StoreTableNode node) {
     PlanProto.StoreTableNodeSpec.Builder storeTableBuilder = PlanProto.StoreTableNodeSpec.newBuilder();
-    if (node.hasPartition()) {
-      storeTableBuilder.setPartitionMethod(node.getPartitionMethod().getProto());
-    }
+
     if (node.hasTableName()) { // It will be false if node is for INSERT INTO LOCATION '...'
       storeTableBuilder.setTableName(node.getTableName());
     }
+
+    if (node.hasUri()) {
+      storeTableBuilder.setUri(node.getUri().toString());
+    }
+    storeTableBuilder.setTableSchema(node.getTableSchema().getProto());
+
+    if (node.hasPartition()) {
+      storeTableBuilder.setPartitionMethod(node.getPartitionMethod().getProto());
+    }
     return storeTableBuilder;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index f66614f..16ca368 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -26,7 +26,6 @@ import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.expr.*;
@@ -41,9 +40,6 @@ import org.apache.tajo.util.TUtil;
 import java.io.IOException;
 import java.util.*;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType.CSV;
-import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType.TEXTFILE;
-
 public class PlannerUtil {
 
   public static final Column [] EMPTY_COLUMNS = new Column[] {};
@@ -915,6 +911,10 @@ public class PlannerUtil {
   }
 
   public static TableDesc getTableDesc(CatalogService catalog, LogicalNode node) throws IOException {
+    if (node.getType() == NodeType.ROOT) {
+      node = ((LogicalRootNode)node).getChild();
+    }
+
     if (node.getType() == NodeType.CREATE_TABLE) {
       return createTableDesc((CreateTableNode)node);
     }
@@ -935,7 +935,7 @@ public class PlannerUtil {
         }
       }
     } else {
-      if (insertNode.getPath() != null) {
+      if (insertNode.getUri() != null) {
         //insert ... location
         return createTableDesc(insertNode);
       }
@@ -951,7 +951,7 @@ public class PlannerUtil {
             createTableNode.getTableName(),
             createTableNode.getTableSchema(),
             meta,
-            createTableNode.getPath() != null ? createTableNode.getPath().toUri() : null);
+            createTableNode.getUri() != null ? createTableNode.getUri() : null);
 
     tableDescTobeCreated.setExternal(createTableNode.isExternal());
 
@@ -970,7 +970,7 @@ public class PlannerUtil {
             insertNode.getTableName(),
             insertNode.getTableSchema(),
             meta,
-            insertNode.getPath() != null ? insertNode.getPath().toUri() : null);
+            insertNode.getUri() != null ? insertNode.getUri() : null);
 
     if (insertNode.hasPartition()) {
       tableDescTobeCreated.setPartitionMethod(insertNode.getPartitionMethod());

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto
index 77a21b7..40b7891 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -223,22 +223,21 @@ message PersistentStoreNode {
 
 message StoreTableNodeSpec { // required PersistentStoreNode
   optional string tableName = 1; // 'INSERT INTO LOCATION' does not require 'table name'.
-  optional PartitionMethodProto partitionMethod = 2;
+  optional string uri = 2;
+  required SchemaProto table_schema = 3;
+  optional PartitionMethodProto partitionMethod = 4;
 }
 
 message InsertNodeSpec { // required PersistentStoreNode and StoreTableSpec
   required bool overwrite = 1;
-  required SchemaProto tableSchema = 2;
-  optional SchemaProto targetSchema = 4;
+  optional SchemaProto targetSchema = 2;
   optional SchemaProto projectedSchema = 3;
-  optional string path = 5;
 }
 
 message CreateTableNodeSpec { // required PersistentStoreNode and StoreTableNodeSpec
-  required SchemaProto schema = 1;
+  optional string tablespace_name = 1;
   required bool external = 2;
   required bool ifNotExists = 3;
-  optional string path = 4;
 }
 
 message DropTableNode {