You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/25 10:09:00 UTC

[08/15] tajo git commit: TAJO-1616: Implement TablespaceManager to load Tablespaces.

TAJO-1616: Implement TablespaceManager to load Tablespaces.

Closes #602


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d0f37012
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d0f37012
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d0f37012

Branch: refs/heads/index_support
Commit: d0f37012faf1a8a3259210287e9bc7762ce62001
Parents: 7c8477d
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jun 24 14:33:34 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jun 24 14:33:34 2015 -0700

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/tajo/algebra/CreateTable.java    |  14 +
 .../org/apache/tajo/catalog/CatalogUtil.java    |   7 +
 .../org/apache/tajo/catalog/DDLBuilder.java     |   5 +-
 .../java/org/apache/tajo/catalog/TableDesc.java |   8 +-
 .../org/apache/tajo/catalog/TestTableDesc.java  |   2 +-
 .../tajo/catalog/store/HiveCatalogStore.java    |   2 +-
 .../catalog/store/TestHiveCatalogStore.java     |  25 +-
 .../tajo/catalog/store/AbstractDBStore.java     |  44 +-
 .../src/main/resources/schemas/derby/derby.xml  |   2 +-
 .../main/resources/schemas/mariadb/mariadb.xml  |   3 +-
 .../src/main/resources/schemas/mysql/mysql.xml  |   3 +-
 .../main/resources/schemas/oracle/oracle.xml    |   3 +-
 .../resources/schemas/postgresql/postgresql.xml |   4 +-
 .../cli/tsql/commands/DescTableCommand.java     |   2 +-
 .../main/java/org/apache/tajo/QueryVars.java    |   4 +-
 .../apache/tajo/storage/StorageConstants.java   |   6 +
 .../org/apache/tajo/storage/StorageService.java |  37 ++
 .../java/org/apache/tajo/util/FileUtil.java     |  14 +-
 .../org/apache/tajo/util/ReflectionUtil.java    |   6 +-
 .../org/apache/tajo/engine/parser/SQLParser.g4  |   8 +-
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |  11 +-
 .../engine/planner/PhysicalPlannerImpl.java     |  14 +-
 .../engine/planner/global/GlobalPlanner.java    |   4 +-
 .../planner/physical/BSTIndexScanExec.java      |   2 +-
 .../planner/physical/ColPartitionStoreExec.java |   2 +-
 .../planner/physical/PhysicalPlanUtil.java      |   2 +-
 .../physical/RangeShuffleFileWriteExec.java     |   2 +-
 .../engine/planner/physical/SeqScanExec.java    |   6 +-
 .../engine/planner/physical/StoreTableExec.java |  19 +-
 .../apache/tajo/engine/query/QueryContext.java  |  67 ++-
 .../org/apache/tajo/master/GlobalEngine.java    |  13 +-
 .../java/org/apache/tajo/master/TajoMaster.java |  22 +-
 .../tajo/master/TajoMasterClientService.java    |   3 +-
 .../apache/tajo/master/exec/DDLExecutor.java    |  58 ++-
 .../exec/NonForwardQueryResultFileScanner.java  |   2 +-
 .../apache/tajo/master/exec/QueryExecutor.java  | 238 +++++----
 .../master/exec/prehook/CreateTableHook.java    |  10 +-
 .../prehook/DistributedQueryHookManager.java    |   2 +-
 .../master/exec/prehook/InsertIntoHook.java     |   8 +-
 .../java/org/apache/tajo/querymaster/Query.java |  51 +-
 .../tajo/querymaster/QueryMasterTask.java       |  72 ++-
 .../apache/tajo/querymaster/Repartitioner.java  |  77 +--
 .../java/org/apache/tajo/querymaster/Stage.java |  12 +-
 .../tajo/webapp/QueryExecutorServlet.java       |   2 +-
 .../org/apache/tajo/worker/LegacyTaskImpl.java  |   2 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |   5 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   |   2 +-
 .../resources/webapps/admin/catalogview.jsp     |   3 +-
 .../src/main/resources/webapps/admin/index.jsp  |  23 +-
 .../org/apache/tajo/BackendTestingUtil.java     |   2 +-
 .../java/org/apache/tajo/QueryTestCaseBase.java |   4 +-
 .../org/apache/tajo/TajoTestingCluster.java     |  35 +-
 .../org/apache/tajo/cli/tsql/TestTajoCli.java   |   4 +-
 .../org/apache/tajo/client/TestTajoClient.java  |  18 +-
 .../apache/tajo/engine/eval/ExprTestBase.java   |   3 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |   3 +-
 .../engine/planner/TestLogicalOptimizer.java    |   4 +-
 .../tajo/engine/planner/TestLogicalPlan.java    |   3 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |  92 ++--
 .../tajo/engine/planner/TestPlannerUtil.java    |   5 +-
 .../planner/physical/TestBNLJoinExec.java       |  15 +-
 .../planner/physical/TestBSTIndexExec.java      |   6 +-
 .../planner/physical/TestExternalSortExec.java  |   6 +-
 .../physical/TestFullOuterHashJoinExec.java     |  29 +-
 .../physical/TestFullOuterMergeJoinExec.java    |  40 +-
 .../planner/physical/TestHashAntiJoinExec.java  |  10 +-
 .../planner/physical/TestHashJoinExec.java      |  14 +-
 .../planner/physical/TestHashSemiJoinExec.java  |  14 +-
 .../physical/TestLeftOuterHashJoinExec.java     |  31 +-
 .../planner/physical/TestMergeJoinExec.java     |  10 +-
 .../engine/planner/physical/TestNLJoinExec.java |  14 +-
 .../planner/physical/TestPhysicalPlanner.java   |  56 +--
 .../physical/TestProgressExternalSortExec.java  |   6 +-
 .../physical/TestRightOuterHashJoinExec.java    |  20 +-
 .../physical/TestRightOuterMergeJoinExec.java   |  36 +-
 .../engine/planner/physical/TestSortExec.java   |   6 +-
 .../apache/tajo/engine/query/TestCTASQuery.java |   8 +-
 .../tajo/engine/query/TestCreateTable.java      |   8 +-
 .../tajo/engine/query/TestHBaseTable.java       | 226 +++++----
 .../tajo/engine/query/TestInsertQuery.java      |   6 +-
 .../apache/tajo/engine/query/TestJoinQuery.java |   6 +-
 .../tajo/engine/query/TestTablePartitions.java  |  28 +-
 .../org/apache/tajo/jdbc/TestResultSet.java     |   4 +-
 .../tajo/master/TestExecutionBlockCursor.java   |  10 +-
 .../apache/tajo/querymaster/TestKillQuery.java  |   5 +-
 .../org/apache/tajo/storage/TestRowFile.java    |   5 +-
 .../results/TestHBaseTable/testCATS.result      | 100 ----
 .../results/TestHBaseTable/testCTAS.result      | 100 ++++
 .../testInsertIntoUsingPut.result               |   4 +-
 .../results/TestTajoCli/testDescTable.result    |   8 +-
 .../testDescTableForNestedSchema.result         |   4 +-
 .../src/main/conf/storage-site.json.template    |  35 ++
 tajo-docs/src/main/sphinx/index.rst             |   1 +
 tajo-docs/src/main/sphinx/storage_plugin.rst    |  47 ++
 tajo-docs/src/main/sphinx/table_management.rst  |   1 +
 .../sphinx/table_management/table_overview.rst  |   7 +
 .../sphinx/table_management/tablespaces.rst     |  45 ++
 .../org/apache/tajo/plan/LogicalPlanner.java    |  64 ++-
 .../tajo/plan/logical/CreateTableNode.java      |  44 +-
 .../apache/tajo/plan/logical/InsertNode.java    |  51 +-
 .../plan/logical/PartitionedTableScanNode.java  |   2 +-
 .../org/apache/tajo/plan/logical/ScanNode.java  |   2 +-
 .../tajo/plan/logical/StoreTableNode.java       |  27 ++
 .../rewrite/rules/PartitionedTableRewriter.java |   4 +-
 .../plan/serder/LogicalNodeDeserializer.java    |  17 +-
 .../tajo/plan/serder/LogicalNodeSerializer.java |  25 +-
 .../org/apache/tajo/plan/util/PlannerUtil.java  |  14 +-
 tajo-plan/src/main/proto/Plan.proto             |  11 +-
 tajo-storage/tajo-storage-common/pom.xml        |  12 +-
 .../org/apache/tajo/storage/FormatProperty.java |  31 ++
 .../org/apache/tajo/storage/MergeScanner.java   |   5 +-
 .../apache/tajo/storage/OldStorageManager.java  | 251 ++++++++++
 .../apache/tajo/storage/StorageProperty.java    |  49 +-
 .../org/apache/tajo/storage/StorageUtil.java    |   4 +-
 .../apache/tajo/storage/TableSpaceManager.java  | 480 ++++++++++++-------
 .../org/apache/tajo/storage/Tablespace.java     | 194 +++++---
 .../src/main/resources/storage-default.json     |  20 +
 tajo-storage/tajo-storage-hbase/pom.xml         |  11 +
 .../storage/hbase/AbstractHBaseAppender.java    |   2 +-
 .../storage/hbase/AddSortForInsertRewriter.java |  91 ----
 .../tajo/storage/hbase/ColumnMapping.java       |  17 +-
 .../tajo/storage/hbase/HBaseFragment.java       |  28 +-
 .../tajo/storage/hbase/HBasePutAppender.java    |  11 +-
 .../apache/tajo/storage/hbase/HBaseScanner.java |   9 +-
 .../tajo/storage/hbase/HBaseTablespace.java     | 209 ++++----
 .../storage/hbase/SortedInsertRewriter.java     | 116 +++++
 .../src/main/proto/StorageFragmentProtos.proto  |  15 +-
 .../tajo/storage/hbase/TestColumnMapping.java   |   2 +-
 .../storage/hbase/TestHBaseStorageManager.java  | 108 -----
 .../tajo/storage/hbase/TestHBaseTableSpace.java | 134 ++++++
 tajo-storage/tajo-storage-hdfs/pom.xml          |   6 +-
 .../org/apache/tajo/storage/FileAppender.java   |  28 +-
 .../org/apache/tajo/storage/FileTablespace.java | 137 ++++--
 .../storage/HashShuffleAppenderManager.java     |   5 +-
 .../tajo/storage/TestCompressionStorages.java   |   4 +-
 .../tajo/storage/TestDelimitedTextFile.java     |   8 +-
 .../tajo/storage/TestFileStorageManager.java    | 233 ---------
 .../apache/tajo/storage/TestFileSystems.java    |   2 +-
 .../apache/tajo/storage/TestFileTablespace.java | 250 ++++++++++
 .../org/apache/tajo/storage/TestLineReader.java |   8 +-
 .../apache/tajo/storage/TestMergeScanner.java   |   6 +-
 .../org/apache/tajo/storage/TestStorages.java   |  48 +-
 .../apache/tajo/storage/index/TestBSTIndex.java |  58 ++-
 .../index/TestSingleCSVFileBSTIndex.java        |   4 +-
 .../apache/tajo/storage/json/TestJsonSerDe.java |   2 +-
 146 files changed, 2966 insertions(+), 1972 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 1f69ac0..75ffd39 100644
--- a/CHANGES
+++ b/CHANGES
@@ -350,6 +350,8 @@ Release 0.11.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1616: Implement TablespaceManager to load Tablespaces. (hyunsik)
+
     TAJO-1615: Implement TaskManager. (jinho)
 
     TAJO-1599: Implement NodeResourceManager and Status updater. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
index 2d4a241..5d1599d 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
@@ -38,6 +38,8 @@ public class CreateTable extends Expr {
   private String tableName;
   @Expose @SerializedName("Attributes")
   private ColumnDefinition [] tableElements;
+  @Expose @SerializedName("SpaceName")
+  private String spaceName;
   @Expose @SerializedName("StorageType")
   private String storageType;
   @Expose @SerializedName("Location")
@@ -100,6 +102,18 @@ public class CreateTable extends Expr {
     this.tableElements = tableElements;
   }
 
+  public boolean hasTableSpaceName() {
+    return spaceName != null;
+  }
+
+  public void setTableSpaceName(String spaceName) {
+    this.spaceName = spaceName;
+  }
+
+  public String getTableSpaceName() {
+    return spaceName;
+  }
+
   public boolean hasStorageType() {
     return storageType != null;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index a2e4a9d..6c6915b 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.catalog;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.DataTypeUtil;
@@ -31,6 +32,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.exception.InvalidOperationException;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.util.TUtil;
 
@@ -227,6 +229,11 @@ public class CatalogUtil {
     return sb.toString();
   }
 
+  public static Pair<String, String> separateQualifierAndName(String name) {
+    Preconditions.checkArgument(isFQTableName(name), "Must be a qualified name.");
+    return new Pair<String, String>(extractQualifier(name), extractSimpleName(name));
+  }
+
   /**
    * Extract a qualification name from an identifier.
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
index 65640b9..62dd894 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.catalog;
 
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.util.KeyValueSet;
 
 import java.util.Map;
@@ -32,7 +31,7 @@ public class DDLBuilder {
     sb.append("--\n")
       .append("-- Name: ").append(CatalogUtil.denormalizeIdentifier(desc.getName())).append("; Type: TABLE;")
       .append(" Storage: ").append(desc.getMeta().getStoreType());
-    sb.append("\n-- Path: ").append(desc.getPath());
+    sb.append("\n-- Path: ").append(desc.getUri());
     sb.append("\n--\n");
     sb.append("CREATE EXTERNAL TABLE ").append(CatalogUtil.denormalizeIdentifier(desc.getName()));
     buildSchema(sb, desc.getSchema());
@@ -109,7 +108,7 @@ public class DDLBuilder {
   }
 
   private static void buildLocationClause(StringBuilder sb, TableDesc desc) {
-    sb.append(" LOCATION '").append(desc.getPath()).append("'");
+    sb.append(" LOCATION '").append(desc.getUri()).append("'");
   }
 
   private static void buildPartitionClause(StringBuilder sb, TableDesc desc) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index 17f9146..4700322 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -89,12 +89,16 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
   public String getName() {
     return this.tableName;
   }
+
+  public boolean hasUri() {
+    return this.uri != null;
+  }
 	
-	public void setPath(URI uri) {
+	public void setUri(URI uri) {
 		this.uri = uri;
 	}
 	
-  public URI getPath() {
+  public URI getUri() {
     return this.uri;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
index 9d84de6..41a0832 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
@@ -78,7 +78,7 @@ public class TestTableDesc {
     TableDesc desc = new TableDesc("table1", schema, info, path.toUri());
     assertEquals("table1", desc.getName());
     
-    assertEquals(path.toUri(), desc.getPath());
+    assertEquals(path.toUri(), desc.getUri());
     assertEquals(info, desc.getMeta());
     testClone(desc);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
index 3ea263a..8f23db4 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
@@ -443,7 +443,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore {
         table.setTableType(TableType.EXTERNAL_TABLE.name());
         table.putToParameters("EXTERNAL", "TRUE");
 
-        Path tablePath = new Path(tableDesc.getPath());
+        Path tablePath = new Path(tableDesc.getUri());
         FileSystem fs = tablePath.getFileSystem(conf);
         if (fs.isFile(tablePath)) {
           LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path.");

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
index 6e91b89..946d271 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
@@ -43,7 +43,6 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.junit.Assert.*;
 
 /**
@@ -105,7 +104,7 @@ public class TestHiveCatalogStore {
 
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(table.getPath(), table1.getPath());
+    assertEquals(table.getUri(), table1.getUri());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -134,7 +133,7 @@ public class TestHiveCatalogStore {
 
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(table.getPath(), table1.getPath());
+    assertEquals(table.getUri(), table1.getUri());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -163,7 +162,7 @@ public class TestHiveCatalogStore {
 
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(table.getPath(), table1.getPath());
+    assertEquals(table.getUri(), table1.getUri());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -197,7 +196,7 @@ public class TestHiveCatalogStore {
 
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, SUPPLIER));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(table.getPath(), table1.getPath());
+    assertEquals(table.getUri(), table1.getUri());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -247,7 +246,7 @@ public class TestHiveCatalogStore {
 
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, NATION));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(table.getPath(), table1.getPath());
+    assertEquals(table.getUri(), table1.getUri());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -261,8 +260,8 @@ public class TestHiveCatalogStore {
       assertEquals(partitionSchema.getColumn(i).getSimpleName(), partitionSchema1.getColumn(i).getSimpleName());
     }
 
-    testAddPartition(table1.getPath(), NATION, "n_nationkey=10/n_date=20150101");
-    testAddPartition(table1.getPath(), NATION, "n_nationkey=20/n_date=20150102");
+    testAddPartition(table1.getUri(), NATION, "n_nationkey=10/n_date=20150101");
+    testAddPartition(table1.getUri(), NATION, "n_nationkey=20/n_date=20150102");
 
     testDropPartition(NATION, "n_nationkey=10/n_date=20150101");
     testDropPartition(NATION, "n_nationkey=20/n_date=20150102");
@@ -370,7 +369,7 @@ public class TestHiveCatalogStore {
 
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, tableName));
     FileSystem fs = FileSystem.getLocal(new Configuration());
-    assertTrue(fs.exists(new Path(table1.getPath())));
+    assertTrue(fs.exists(new Path(table1.getUri())));
 
     store.dropTable(DB_NAME, tableName);
     assertFalse(store.existTable(DB_NAME, tableName));
@@ -395,7 +394,7 @@ public class TestHiveCatalogStore {
 
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(table.getPath(), table1.getPath());
+    assertEquals(table.getUri(), table1.getUri());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -424,7 +423,7 @@ public class TestHiveCatalogStore {
 
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(table.getPath(), table1.getPath());
+    assertEquals(table.getUri(), table1.getUri());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -456,7 +455,7 @@ public class TestHiveCatalogStore {
 
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(table.getPath(), table1.getPath());
+    assertEquals(table.getUri(), table1.getUri());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -491,7 +490,7 @@ public class TestHiveCatalogStore {
 
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, tableName));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(table.getPath(), table1.getPath());
+    assertEquals(table.getUri(), table1.getUri());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 34740c0..043c8bc 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -761,36 +761,25 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
 
       int dbid = getDatabaseId(databaseName);
 
-      if (table.getIsExternal()) {
-        String sql = "INSERT INTO TABLES (DB_ID, " + COL_TABLES_NAME + ", TABLE_TYPE, PATH, STORE_TYPE) VALUES(?, ?, ?, ?, ?) ";
+      String sql = "INSERT INTO TABLES (DB_ID, " + COL_TABLES_NAME +
+          ", TABLE_TYPE, PATH, STORE_TYPE) VALUES(?, ?, ?, ?, ?) ";
 
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
 
-        pstmt = conn.prepareStatement(sql);
-        pstmt.setInt(1, dbid);
-        pstmt.setString(2, tableName);
+      pstmt = conn.prepareStatement(sql);
+      pstmt.setInt(1, dbid);
+      pstmt.setString(2, tableName);
+      if (table.getIsExternal()) {
         pstmt.setString(3, TableType.EXTERNAL_TABLE.name());
-        pstmt.setString(4, table.getPath());
-        pstmt.setString(5, table.getMeta().getStoreType());
-        pstmt.executeUpdate();
-        pstmt.close();
       } else {
-        String sql = "INSERT INTO TABLES (DB_ID, " + COL_TABLES_NAME + ", TABLE_TYPE, STORE_TYPE) VALUES(?, ?, ?, ?) ";
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        pstmt = conn.prepareStatement(sql);
-        pstmt.setInt(1, dbid);
-        pstmt.setString(2, tableName);
         pstmt.setString(3, TableType.BASE_TABLE.name());
-        pstmt.setString(4, table.getMeta().getStoreType());
-        pstmt.executeUpdate();
-        pstmt.close();
       }
+      pstmt.setString(4, table.getPath());
+      pstmt.setString(5, table.getMeta().getStoreType());
+      pstmt.executeUpdate();
+      pstmt.close();
 
       String tidSql =
           "SELECT TID from " + TB_TABLES + " WHERE " + COL_DATABASES_PK + "=? AND " + COL_TABLES_NAME + "=?";
@@ -1612,12 +1601,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
         tableBuilder.setIsExternal(true);
       }
 
-      if (tableType == TableType.BASE_TABLE) {
-        tableBuilder.setPath(databaseIdAndUri.getSecond() + "/" + tableName);
-      } else {
-        tableBuilder.setPath(res.getString(4).trim());
-      }
-
+      tableBuilder.setPath(res.getString(4).trim());
       storeType = res.getString(5).trim();
 
       res.close();

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
index a4ff00f..e0bd469 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml
@@ -24,7 +24,7 @@
       * 2 - 2014-06-09: First versioning
       * 1-  Before 2013-03-20
     -->
-	<tns:base version="4">
+	<tns:base version="5">
 		<tns:objects>
 			<tns:Object order="0" type="table" name="META">
 				<tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
index 79ccd0a..7485da1 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/mariadb.xml
@@ -19,12 +19,13 @@
 <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
   <!--
       Catalog base version history
+      * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
       * 4 - 2015-03-27: Partition Schema (TAJO-1284)
       * 3 - 2015-03-12: Nested Schema (TAJO-1329)
       * 2 - 2014-06-09: First versioning
       * 1-  Before 2013-03-20
     -->
-  <tns:base version="4">
+  <tns:base version="5">
     <tns:objects>
       <tns:Object order="0" type="table" name="META">
         <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
index 34337fb..2bde04f 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/mysql.xml
@@ -19,12 +19,13 @@
 <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
   <!--
     Catalog base version history
+    * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
     * 4 - 2015-03-27: Partition Schema (TAJO-1284)
     * 3 - 2015-03-12: Nested Schema (TAJO-1329)
     * 2 - 2014-06-09: First versioning
     * 1-  Before 2013-03-20
   -->
-  <tns:base version="4">
+  <tns:base version="5">
     <tns:objects>
       <tns:Object order="0" type="table" name="META">
         <tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
index 323e22c..2778e0c 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml
@@ -19,12 +19,13 @@
 <tns:store xmlns:tns="http://tajo.apache.org/catalogstore" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
   <!--
       Catalog base version history
+      * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
       * 4 - 2015-03-27: Partition Schema (TAJO-1284)
       * 3 - 2015-03-12: Nested Schema (TAJO-1329)
       * 2 - 2014-06-09: First versioning
       * 1-  Before 2013-03-20
     -->
-  <tns:base version="4">
+  <tns:base version="5">
     <tns:objects>
   		<tns:Object order="0" type="table" name="meta">
   			<tns:sql><![CDATA[

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
index 554acd5..0051242 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml
@@ -21,12 +21,14 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition.xsd ">
   <!--
       Catalog base version history
+      * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
+      * 5 - 2015-06-15: Implement TablespaceManager to load Tablespaces (TAJO-1616)
       * 4 - 2015-03-27: Partition Schema (TAJO-1284)
       * 3 - 2015-03-12: Nested Schema (TAJO-1329)
       * 2 - 2014-06-09: First versioning
       * 1-  Before 2013-03-20
     -->
-	<tns:base version="4">
+	<tns:base version="5">
 		<tns:objects>
 			<tns:Object name="META" type="table" order="0">
 				<tns:sql><![CDATA[CREATE TABLE META (VERSION INT NOT NULL)]]></tns:sql>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
index 6df26b7..6b2905a 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tsql/commands/DescTableCommand.java
@@ -78,7 +78,7 @@ public class DescTableCommand extends TajoShellCommand {
   protected String toFormattedString(TableDesc desc) {
     StringBuilder sb = new StringBuilder();
     sb.append("\ntable name: ").append(desc.getName()).append("\n");
-    sb.append("table path: ").append(desc.getPath()).append("\n");
+    sb.append("table uri: ").append(desc.getUri()).append("\n");
     sb.append("store type: ").append(desc.getMeta().getStoreType()).append("\n");
     if (desc.getStats() != null) {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
index 55ca700..a6d5d1d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java
@@ -22,9 +22,11 @@ import org.apache.tajo.validation.Validator;
 
 public enum QueryVars implements ConfigKey {
   COMMAND_TYPE,
+  DEFAULT_SPACE_URI,
+  DEFAULT_SPACE_ROOT_URI,
   STAGING_DIR,
   OUTPUT_TABLE_NAME,
-  OUTPUT_TABLE_PATH,
+  OUTPUT_TABLE_URI,
   OUTPUT_PARTITIONS,
   OUTPUT_OVERWRITE,
   OUTPUT_AS_DIRECTORY,

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index d2c6c1c..a9923a5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -20,8 +20,14 @@ package org.apache.tajo.storage;
 
 import org.apache.tajo.TajoConstants;
 
+import java.net.URI;
+
 public class StorageConstants {
 
+  // Tablespace  -------------------------------------------------
+
+  public static final URI LOCAL_FS_URI = URI.create("file:/");
+
   // Common table properties -------------------------------------------------
 
   // time zone

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java
new file mode 100644
index 0000000..1057097
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageService.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import javax.annotation.Nullable;
+import java.net.URI;
+
+/**
+ * TablespaceManager interface for loosely coupled usages
+ */
+public interface StorageService {
+  /**
+   * Get Table URI
+   *
+   * @param spaceName Tablespace name. If it is null, the default space will be used
+   * @param databaseName Database name
+   * @param tableName Table name
+   * @return Table URI
+   */
+  URI getTableURI(@Nullable String spaceName, String databaseName, String tableName);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
index 3e3d3a2..39f4c29 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/FileUtil.java
@@ -87,8 +87,20 @@ public class FileUtil {
     return ClassLoader.getSystemResource(resource);
   }
 
+  /**
+   * It returns a string from a text file found in classpath.
+   *
+   * @param resource Resource file name
+   * @return String contents if exists. Otherwise, it will return null.
+   * @throws IOException
+   */
   public static String readTextFileFromResource(String resource) throws IOException {
-    return readTextFromStream(ClassLoader.getSystemResourceAsStream(resource));
+    InputStream stream = ClassLoader.getSystemResourceAsStream(resource);
+    if (stream != null) {
+      return readTextFromStream(stream);
+    } else {
+      return null;
+    }
   }
 
   public static String readTextFromStream(InputStream inputStream)

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
index e2def69..5a712c0 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/ReflectionUtil.java
@@ -25,9 +25,9 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 public class ReflectionUtil {
-  private static final Class<?>[] EMPTY_PARAM = new Class[]{};
-  private static final Object [] EMPTY_OBJECT = new Object[] {};
-  private static final Class<?>[] CONF_PARAM = new Class[]{TajoConf.class};
+  public static final Class<?>[] EMPTY_PARAM = new Class[]{};
+  public static final Object [] EMPTY_OBJECT = new Object[] {};
+  public static final Class<?>[] CONF_PARAM = new Class[]{TajoConf.class};
 
   /**
    * Caches of constructors for each class. Pins the classes so they

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index 3ab11bd..469b2a2 100644
--- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -98,11 +98,11 @@ if_exists
   ;
 
 create_table_statement
-  : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements USING storage_type=identifier
-    (param_clause)? (table_partitioning_clauses)? (LOCATION path=Character_String_Literal)?
-  | CREATE TABLE (if_not_exists)? table_name table_elements (USING storage_type=identifier)?
+  : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements (TABLESPACE spacename=identifier)? USING storage_type=identifier
+    (param_clause)? (table_partitioning_clauses)? (LOCATION uri=Character_String_Literal)?
+  | CREATE TABLE (if_not_exists)? table_name table_elements (TABLESPACE spacename=identifier)? (USING storage_type=identifier)?
     (param_clause)? (table_partitioning_clauses)? (AS query_expression)?
-  | CREATE TABLE (if_not_exists)? table_name (USING storage_type=identifier)?
+  | CREATE TABLE (if_not_exists)? table_name (TABLESPACE spacename=identifier)? (USING storage_type=identifier)?
     (param_clause)? (table_partitioning_clauses)? AS query_expression
   | CREATE TABLE (if_not_exists)? table_name LIKE like_table_name=table_name
   ;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 7c99868..62bb0f9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -1252,9 +1252,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       createTable.setTableElements(elements);
       createTable.setStorageType(storageType);
 
-      if (PlannerUtil.isFileStorageType(storageType)) {
-        String path = stripQuote(ctx.path.getText());
-        createTable.setLocation(path);
+      if (checkIfExist(ctx.LOCATION())) {
+        String uri = stripQuote(ctx.uri.getText());
+        createTable.setLocation(uri);
       }
     } else {
       if (checkIfExist(ctx.table_elements())) {
@@ -1262,6 +1262,11 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
         createTable.setTableElements(elements);
       }
 
+      if (checkIfExist(ctx.TABLESPACE())) {
+        String spaceName = ctx.spacename.getText();
+        createTable.setTableSpaceName(spaceName);
+      }
+
       if (checkIfExist(ctx.USING())) {
         String fileType = ctx.storage_type.getText();
         createTable.setStorageType(fileType);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 30cb24f..f0b2f5e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.SessionVars;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -251,7 +252,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       FragmentProto[] fragmentProtos = ctx.getTables(tableId);
       List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
       for (Fragment frag : fragments) {
-        size += Tablespace.getFragmentLength(ctx.getConf(), frag);
+        size += TableSpaceManager.guessFragmentVolume(ctx.getConf(), frag);
       }
     }
     return size;
@@ -924,9 +925,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         if (broadcastFlag) {
           PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
           List<Fragment> fileFragments = TUtil.newList();
-          FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getFileStorageManager(ctx.getConf());
+
+          FileTablespace space = (FileTablespace) TableSpaceManager.get(scanNode.getTableDesc().getUri()).get();
           for (Path path : partitionedTableScanNode.getInputPaths()) {
-            fileFragments.addAll(TUtil.newList(fileStorageManager.split(scanNode.getCanonicalName(), path)));
+            fileFragments.addAll(TUtil.newList(space.split(scanNode.getCanonicalName(), path)));
           }
 
           FragmentProto[] fragments =
@@ -1188,8 +1190,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
 
     String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
-    FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(ctx.getConf());
-    Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index");
+    FileTablespace sm = (FileTablespace) TableSpaceManager.get(fragments.get(0).getPath().toUri()).get();
+    String dbName = CatalogUtil.extractQualifier(annotation.getTableName());
+    String simpleName = CatalogUtil.extractSimpleName(annotation.getTableName());
+    Path indexPath = new Path(new Path(sm.getTableUri(dbName, simpleName)), "index");
 
     TupleComparator comp = new BaseTupleComparator(annotation.getKeySchema(),
         annotation.getSortKeys());

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 89e887a..ba4833b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -48,6 +48,7 @@ import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
+import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.ReflectionUtil;
 import org.apache.tajo.util.TUtil;
@@ -170,7 +171,8 @@ public class GlobalPlanner {
         "Channel schema (" + channel.getSrcId().getId() + " -> " + channel.getTargetId().getId() +
             ") is not initialized");
     TableMeta meta = new TableMeta(channel.getStoreType(), new KeyValueSet());
-    TableDesc desc = new TableDesc(channel.getSrcId().toString(), channel.getSchema(), meta, new Path("/").toUri());
+    TableDesc desc = new TableDesc(
+        channel.getSrcId().toString(), channel.getSchema(), meta, StorageConstants.LOCAL_FS_URI);
     ScanNode scanNode = plan.createNode(ScanNode.class);
     scanNode.init(desc);
     return scanNode;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index bc6975a..54abca8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -56,7 +56,7 @@ public class BSTIndexScanExec extends PhysicalExec {
     this.qual = scanNode.getQual();
     this.datum = datum;
 
-    this.fileScanner = TableSpaceManager.getSeekableScanner(context.getConf(),
+    this.fileScanner = OldStorageManager.getSeekableScanner(context.getConf(),
         scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
     this.fileScanner.init();
     this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 3121671..969998c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -165,7 +165,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
       actualFilePath = new Path(lastFileName + "_" + suffixId);
     }
 
-    appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(context.getConf()))
+    appender = ((FileTablespace) TableSpaceManager.get(lastFileName.toUri()).get())
         .getAppender(meta, outSchema, actualFilePath);
 
     appender.enableStats();

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index d240edb..deda498 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -74,7 +74,7 @@ public class PhysicalPlanUtil {
    */
   public static CatalogProtos.FragmentProto[] getNonZeroLengthDataFiles(TajoConf tajoConf,TableDesc tableDesc,
                                                           int fileIndex, int numResultFiles) throws IOException {
-    Path path = new Path(tableDesc.getPath());
+    Path path = new Path(tableDesc.getUri());
     FileSystem fs = path.getFileSystem(tajoConf);
 
     //In the case of partitioned table, we should return same partition key data files.

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 3dd1cd9..fb29e4f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -77,7 +77,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
         context.getDataChannel().getStoreType() : "RAW");
     FileSystem fs = new RawLocalFileSystem();
     fs.mkdirs(storeTablePath);
-    this.appender = (FileAppender) ((FileTablespace) TableSpaceManager.getFileStorageManager(context.getConf()))
+    this.appender = (FileAppender) ((FileTablespace) TableSpaceManager.getDefault())
         .getAppender(meta, outSchema, new Path(storeTablePath, "output"));
     this.appender.enableStats();
     this.appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index b01af6c..d2ae3bd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -202,10 +202,8 @@ public class SeqScanExec extends ScanExec {
             FragmentConvertor.convert(context.getConf(), fragments), projected
         );
       } else {
-        Tablespace tablespace = TableSpaceManager.getStorageManager(
-            context.getConf(), plan.getTableDesc().getMeta().getStoreType());
-        this.scanner = tablespace.getScanner(meta,
-            plan.getPhysicalSchema(), fragments[0], projected);
+        Tablespace tablespace = TableSpaceManager.get(plan.getTableDesc().getUri()).get();
+        this.scanner = tablespace.getScanner(meta, plan.getPhysicalSchema(), fragments[0], projected);
       }
       scanner.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 5b17eee..dd8768e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import com.google.common.base.Optional;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -35,6 +36,7 @@ import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.net.URI;
 
 /**
  * This is a physical executor to store a table part into a specified storage.
@@ -90,17 +92,26 @@ public class StoreTableExec extends UnaryPhysicalExec {
         lastFileName = new Path(lastFileName + "_" + suffixId);
       }
 
-      appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(context.getConf()))
-          .getAppender(meta, appenderSchema, lastFileName);
+      Optional<FileTablespace> spaceRes = TableSpaceManager.get(lastFileName.toUri());
+      if (!spaceRes.isPresent())  {
+        throw new IllegalStateException("No Tablespace for " + lastFileName.toUri());
+      }
+
+      FileTablespace space = spaceRes.get();
+      appender = space.getAppender(meta, appenderSchema, lastFileName);
 
       if (suffixId > 0) {
         LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " +
             "The remain output will be written into " + lastFileName.toString());
       }
     } else {
-      appender = TableSpaceManager.getStorageManager(context.getConf(), meta.getStoreType()).getAppender(
+      Path stagingDir = context.getQueryContext().getStagingDir();
+      appender = TableSpaceManager.get(stagingDir.toUri()).get().getAppender(
           context.getQueryContext(),
-          context.getTaskId(), meta, appenderSchema, context.getQueryContext().getStagingDir());
+          context.getTaskId(),
+          meta,
+          appenderSchema,
+          stagingDir);
     }
 
     appender.enableStats();

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index ee50221..7696c6c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -28,6 +28,8 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.session.Session;
 
+import java.net.URI;
+
 import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto;
 
 /**
@@ -63,6 +65,40 @@ public class QueryContext extends OverridableConf {
     return get(SessionVars.USERNAME);
   }
 
+  /**
+   * Set the default tablespace uri
+   *
+   * @param uri The default tablespace uri
+   */
+  public void setDefaultSpaceUri(URI uri) {
+    put(QueryVars.DEFAULT_SPACE_URI, uri.toString());
+  }
+
+  /**
+   * Return the default tablespace uri
+   */
+  public URI getDefaultSpaceUri() {
+    String strVal = get(QueryVars.DEFAULT_SPACE_URI, "");
+    return strVal != null && !strVal.isEmpty() ? URI.create(strVal) : null;
+  }
+
+  /**
+   * Set the root uri of the default tablespace
+   *
+   * @param uri The root uri of the default tablespace
+   */
+  public void setDefaultSpaceRootUri(URI uri) {
+    put(QueryVars.DEFAULT_SPACE_ROOT_URI, uri.toString());
+  }
+
+  /**
+   * Return the root of the default tablespace
+   */
+  public URI getDefaultSpaceRootUri() {
+    String strVal = get(QueryVars.DEFAULT_SPACE_ROOT_URI, "");
+    return strVal != null && !strVal.isEmpty() ? URI.create(strVal) : null;
+  }
+
   public void setStagingDir(Path path) {
     put(QueryVars.STAGING_DIR, path.toUri().toString());
   }
@@ -82,24 +118,33 @@ public class QueryContext extends OverridableConf {
   }
 
   /**
-   * The fact that QueryContext has an output path means this query will write the output to a specific directory.
-   * In other words, this query is 'CREATE TABLE' or 'INSERT (OVERWRITE) INTO (<table name>|LOCATION)' statement.
+   * The final output table's uri. It will be set if a query is CTAS or INSERT (OVERWRITE) INTO statement
    *
-   * @return
+   * @return True if a output table uri is set. Otherwise, it will return false
    */
-  public boolean hasOutputPath() {
-    return containsKey(QueryVars.OUTPUT_TABLE_PATH);
+  public boolean hasOutputTableUri() {
+    return containsKey(QueryVars.OUTPUT_TABLE_URI);
   }
 
-  public void setOutputPath(Path path) {
-    if (path != null) {
-      put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString());
+  /**
+   * Set the final output table uri
+   *
+   * @param uri
+   */
+  public void setOutputPath(URI uri) {
+    if (uri != null) {
+      put(QueryVars.OUTPUT_TABLE_URI, uri.toString());
     }
   }
 
-  public Path getOutputPath() {
-    String strVal = get(QueryVars.OUTPUT_TABLE_PATH);
-    return strVal != null ? new Path(strVal) : null;
+  /**
+   * Get the final output table uri
+   *
+   * @return The final output table uri
+   */
+  public URI getOutputTableUri() {
+    String strVal = get(QueryVars.OUTPUT_TABLE_URI);
+    return strVal != null ? URI.create(strVal) : null;
   }
 
   public boolean hasPartition() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 2cd585f..e833884 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -53,7 +53,6 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.VerificationState;
 import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 
@@ -68,7 +67,6 @@ public class GlobalEngine extends AbstractService {
   private final static Log LOG = LogFactory.getLog(GlobalEngine.class);
 
   private final MasterContext context;
-  private final Tablespace sm;
 
   private SQLAnalyzer analyzer;
   private CatalogService catalog;
@@ -84,7 +82,6 @@ public class GlobalEngine extends AbstractService {
     super(GlobalEngine.class.getName());
     this.context = context;
     this.catalog = context.getCatalog();
-    this.sm = context.getStorageManager();
 
     this.ddlExecutor = new DDLExecutor(context);
     this.queryExecutor = new QueryExecutor(context, ddlExecutor);
@@ -94,7 +91,7 @@ public class GlobalEngine extends AbstractService {
     try  {
       analyzer = new SQLAnalyzer();
       preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
-      planner = new LogicalPlanner(context.getCatalog());
+      planner = new LogicalPlanner(context.getCatalog(), TableSpaceManager.getInstance());
       optimizer = new LogicalOptimizer(context.getConf());
       annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
     } catch (Throwable t) {
@@ -143,6 +140,10 @@ public class GlobalEngine extends AbstractService {
   private QueryContext createQueryContext(Session session) {
     QueryContext newQueryContext =  new QueryContext(context.getConf(), session);
 
+    // Set default space uri and its root uri
+    newQueryContext.setDefaultSpaceUri(TableSpaceManager.getDefault().getUri());
+    newQueryContext.setDefaultSpaceRootUri(TableSpaceManager.getDefault().getRootUri());
+
     String tajoTest = System.getProperty(CommonTestingUtil.TAJO_TEST_KEY);
     if (tajoTest != null && tajoTest.equalsIgnoreCase(CommonTestingUtil.TAJO_TEST_TRUE)) {
       newQueryContext.putAll(CommonTestingUtil.getSessionVarsForTest());
@@ -302,8 +303,8 @@ public class GlobalEngine extends AbstractService {
           InsertNode iNode = rootNode.getChild();
           Schema outSchema = iNode.getChild().getOutSchema();
 
-          TableSpaceManager.getStorageManager(queryContext.getConf(), storeType)
-              .verifyInsertTableSchema(tableDesc, outSchema);
+          TableSpaceManager.get(tableDesc.getUri()).get().verifySchemaToWrite(tableDesc, outSchema);
+
         } catch (Throwable t) {
           state.addVerification(t.getMessage());
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index c41fdde..e1e85dd 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -21,7 +21,10 @@ package org.apache.tajo.master;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.service.CompositeService;
@@ -53,8 +56,6 @@ import org.apache.tajo.rule.SelfDiagnosisRuleSession;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.session.SessionManager;
-import org.apache.tajo.storage.Tablespace;
-import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.*;
 import org.apache.tajo.util.history.HistoryReader;
 import org.apache.tajo.util.history.HistoryWriter;
@@ -114,7 +115,6 @@ public class TajoMaster extends CompositeService {
 
   private CatalogServer catalogServer;
   private CatalogService catalog;
-  private Tablespace storeManager;
   private GlobalEngine globalEngine;
   private AsyncDispatcher dispatcher;
   private TajoMasterClientService tajoMasterClientService;
@@ -183,7 +183,6 @@ public class TajoMaster extends CompositeService {
       // check the system directory and create if they are not created.
       checkAndInitializeSystemDirectories();
       diagnoseTajoMaster();
-      this.storeManager = TableSpaceManager.getFileStorageManager(systemConf);
 
       catalogServer = new CatalogServer(loadFunctions());
       addIfService(catalogServer);
@@ -211,7 +210,14 @@ public class TajoMaster extends CompositeService {
       throw e;
     }
 
-    super.serviceInit(systemConf);
+    // Try to start up all services in TajoMaster.
+    // If anyone is failed, the master prints out the errors and immediately should shutdowns
+    try {
+      super.serviceInit(systemConf);
+    } catch (Throwable t) {
+      t.printStackTrace();
+      System.exit(1);
+    }
     LOG.info("Tajo Master is initialized.");
   }
 
@@ -477,10 +483,6 @@ public class TajoMaster extends CompositeService {
       return globalEngine;
     }
 
-    public Tablespace getStorageManager() {
-      return storeManager;
-    }
-
     public QueryCoordinatorService getTajoMasterService() {
       return tajoMasterService;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 31eecdc..7dbe815 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -844,8 +844,7 @@ public class TajoMasterClientService extends AbstractService {
         TableDesc desc;
         try {
           desc = context.getGlobalEngine().getDDLExecutor().createTable(queryContext, request.getName(),
-              meta.getStoreType(), schema,
-              meta, path, true, partitionDesc, false);
+              null, meta.getStoreType(), schema, meta, path.toUri(), true, partitionDesc, false);
         } catch (Exception e) {
           return TableResponse.newBuilder()
               .setResultCode(ResultCode.ERROR)

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index 93c950e..5e0e639 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -36,11 +36,12 @@ import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TableSpaceManager;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -54,12 +55,10 @@ public class DDLExecutor {
 
   private final TajoMaster.MasterContext context;
   private final CatalogService catalog;
-  private final Tablespace tablespace;
 
   public DDLExecutor(TajoMaster.MasterContext context) {
     this.context = context;
     this.catalog = context.getCatalog();
-    this.tablespace = context.getStorageManager();
   }
 
   public boolean execute(QueryContext queryContext, LogicalPlan plan) throws IOException {
@@ -202,17 +201,31 @@ public class DDLExecutor {
     }
 
     if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){
-      Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
+      Preconditions.checkState(createTable.hasUri(), "ERROR: LOCATION must be given.");
     }
 
-    return createTable(queryContext, createTable.getTableName(), createTable.getStorageType(),
-        createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(),
-        createTable.getPartitionMethod(), ifNotExists);
+    return createTable(
+        queryContext,
+        createTable.getTableName(),
+        createTable.getTableSpaceName(),
+        createTable.getStorageType(),createTable.getTableSchema(),
+        meta,
+        createTable.getUri(),
+        createTable.isExternal(),
+        createTable.getPartitionMethod(),
+        ifNotExists);
   }
 
-  public TableDesc createTable(QueryContext queryContext, String tableName, String storeType,
-                               Schema schema, TableMeta meta, Path path, boolean isExternal,
-                               PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException {
+  public TableDesc createTable(QueryContext queryContext,
+                               String tableName,
+                               @Nullable String tableSpaceName,
+                               @Nullable String storeType,
+                               Schema schema,
+                               TableMeta meta,
+                               @Nullable URI uri,
+                               boolean isExternal,
+                               @Nullable PartitionMethodDesc partitionDesc,
+                               boolean ifNotExists) throws IOException {
     String databaseName;
     String simpleTableName;
     if (CatalogUtil.isFQTableName(tableName)) {
@@ -232,18 +245,28 @@ public class DDLExecutor {
         LOG.info("relation \"" + qualifiedName + "\" is already exists." );
         return catalog.getTableDesc(databaseName, simpleTableName);
       } else {
-        throw new AlreadyExistsTableException(CatalogUtil.buildFQName(databaseName, tableName));
+        throw new AlreadyExistsTableException(qualifiedName);
       }
     }
 
-    TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
-        schema, meta, (path != null ? path.toUri(): null), isExternal);
+    Tablespace tableSpace;
+    if (tableSpaceName != null) {
+      tableSpace = TableSpaceManager.getByName(tableSpaceName).get();
+    } else if (uri != null) {
+      tableSpace = TableSpaceManager.get(uri).get();
+    } else {
+      tableSpace = TableSpaceManager.getDefault();
+    }
+
+    TableDesc desc;
+    URI tableUri = isExternal ? uri : tableSpace.getTableUri(databaseName, simpleTableName);
+    desc = new TableDesc(qualifiedName, schema, meta, tableUri, isExternal);
 
     if (partitionDesc != null) {
       desc.setPartitionMethod(partitionDesc);
     }
 
-    TableSpaceManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists);
+    tableSpace.createTable(desc, ifNotExists);
 
     if (catalog.createTable(desc)) {
       LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
@@ -290,8 +313,7 @@ public class DDLExecutor {
 
     if (purge) {
       try {
-        TableSpaceManager.getStorageManager(queryContext.getConf(),
-            tableDesc.getMeta().getStoreType()).purgeTable(tableDesc);
+        TableSpaceManager.get(tableDesc.getUri()).get().purgeTable(tableDesc);
       } catch (IOException e) {
         throw new InternalError(e.getMessage());
       }
@@ -330,7 +352,7 @@ public class DDLExecutor {
 
       Path warehousePath = new Path(TajoConf.getWarehouseDir(context.getConf()), databaseName);
       TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName);
-      Path tablePath = new Path(tableDesc.getPath());
+      Path tablePath = new Path(tableDesc.getUri());
       if (tablePath.getParent() == null ||
           !tablePath.getParent().toUri().getPath().equals(warehousePath.toUri().getPath())) {
         throw new IOException("Can't truncate external table:" + eachTableName + ", data dir=" + tablePath +
@@ -340,7 +362,7 @@ public class DDLExecutor {
     }
 
     for (TableDesc eachTable: tableDescList) {
-      Path path = new Path(eachTable.getPath());
+      Path path = new Path(eachTable.getUri());
       LOG.info("Truncate table: " + eachTable.getName() + ", delete all data files in " + path);
       FileSystem fs = path.getFileSystem(context.getConf());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index 8f6c6f9..ae57453 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -101,7 +101,7 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
   }
 
   private void initSeqScanExec() throws IOException {
-    Tablespace tablespace = TableSpaceManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType());
+    Tablespace tablespace = TableSpaceManager.get(tableDesc.getUri()).get();
     List<Fragment> fragments = null;
     setPartition(tablespace);
     fragments = tablespace.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 281edad..480f45c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -43,24 +43,26 @@ import org.apache.tajo.engine.planner.physical.StoreTableExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
-import org.apache.tajo.master.*;
+import org.apache.tajo.master.QueryInfo;
+import org.apache.tajo.master.QueryManager;
+import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.exec.prehook.CreateTableHook;
 import org.apache.tajo.master.exec.prehook.DistributedQueryHookManager;
 import org.apache.tajo.master.exec.prehook.InsertIntoHook;
-import org.apache.tajo.plan.expr.EvalContext;
-import org.apache.tajo.plan.expr.GeneralFunctionEval;
-import org.apache.tajo.plan.function.python.PythonScriptEngine;
-import org.apache.tajo.plan.function.python.TajoScriptEngine;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.querymaster.*;
-import org.apache.tajo.session.Session;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.Target;
+import org.apache.tajo.plan.expr.EvalContext;
 import org.apache.tajo.plan.expr.EvalNode;
+import org.apache.tajo.plan.expr.GeneralFunctionEval;
+import org.apache.tajo.plan.function.python.PythonScriptEngine;
+import org.apache.tajo.plan.function.python.TajoScriptEngine;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.verifier.VerifyException;
+import org.apache.tajo.querymaster.Query;
+import org.apache.tajo.querymaster.QueryMasterTask;
+import org.apache.tajo.session.Session;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.ProtoUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -329,104 +331,108 @@ public class QueryExecutor {
   }
 
   private void insertNonFromQuery(QueryContext queryContext,
-                                  InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder)
-      throws Exception {
-    String nodeUniqName = insertNode.getTableName() == null ? insertNode.getPath().getName() : insertNode.getTableName();
-    String queryId = nodeUniqName + "_" + System.currentTimeMillis();
-
-    FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
-    Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext);
-    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-
-    TableDesc tableDesc = null;
-    Path finalOutputDir = null;
-    if (insertNode.getTableName() != null) {
-      tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
-      finalOutputDir = new Path(tableDesc.getPath());
-    } else {
-      finalOutputDir = insertNode.getPath();
-    }
-
-    TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
-    taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
-
-    EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
-    StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
+                                  InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) {
     try {
-      exec.init();
-      exec.next();
-    } finally {
-      exec.close();
-    }
+      String nodeUniqName = insertNode.getTableName() == null ? new Path(insertNode.getUri()).getName() :
+          insertNode.getTableName();
+      String queryId = nodeUniqName + "_" + System.currentTimeMillis();
+
+      FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
+      Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext);
+      Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+
+      TableDesc tableDesc = null;
+      Path finalOutputDir;
+      if (insertNode.getTableName() != null) {
+        tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
+        finalOutputDir = new Path(tableDesc.getUri());
+      } else {
+        finalOutputDir = new Path(insertNode.getUri());
+      }
+
+      TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
+      taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
 
-    if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
-      // it moves the original table into the temporary location.
-      // Then it moves the new result table into the original table location.
-      // Upon failed, it recovers the original table if possible.
-      boolean movedToOldTable = false;
-      boolean committed = false;
-      Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+      EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+      StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
       try {
-        if (fs.exists(finalOutputDir)) {
-          fs.rename(finalOutputDir, oldTableDir);
-          movedToOldTable = fs.exists(oldTableDir);
-        } else { // if the parent does not exist, make its parent directory.
-          fs.mkdirs(finalOutputDir.getParent());
-        }
-        fs.rename(stagingResultDir, finalOutputDir);
-        committed = fs.exists(finalOutputDir);
-      } catch (IOException ioe) {
-        // recover the old table
-        if (movedToOldTable && !committed) {
-          fs.rename(oldTableDir, finalOutputDir);
-        }
+        exec.init();
+        exec.next();
+      } finally {
+        exec.close();
       }
-    } else {
-      FileStatus[] files = fs.listStatus(stagingResultDir);
-      for (FileStatus eachFile: files) {
-        Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
-        if (fs.exists(targetFilePath)) {
-          targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
+
+      if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
+        // it moves the original table into the temporary location.
+        // Then it moves the new result table into the original table location.
+        // Upon failed, it recovers the original table if possible.
+        boolean movedToOldTable = false;
+        boolean committed = false;
+        Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+        try {
+          if (fs.exists(finalOutputDir)) {
+            fs.rename(finalOutputDir, oldTableDir);
+            movedToOldTable = fs.exists(oldTableDir);
+          } else { // if the parent does not exist, make its parent directory.
+            fs.mkdirs(finalOutputDir.getParent());
+          }
+          fs.rename(stagingResultDir, finalOutputDir);
+          committed = fs.exists(finalOutputDir);
+        } catch (IOException ioe) {
+          // recover the old table
+          if (movedToOldTable && !committed) {
+            fs.rename(oldTableDir, finalOutputDir);
+          }
+        }
+      } else {
+        FileStatus[] files = fs.listStatus(stagingResultDir);
+        for (FileStatus eachFile : files) {
+          Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
+          if (fs.exists(targetFilePath)) {
+            targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
+          }
+          fs.rename(eachFile.getPath(), targetFilePath);
         }
-        fs.rename(eachFile.getPath(), targetFilePath);
       }
-    }
 
-    if (insertNode.hasTargetTable()) {
-      TableStats stats = tableDesc.getStats();
-      long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
-      stats.setNumBytes(volume);
-      stats.setNumRows(1);
+      if (insertNode.hasTargetTable()) {
+        TableStats stats = tableDesc.getStats();
+        long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+        stats.setNumBytes(volume);
+        stats.setNumRows(1);
 
-      CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
-      builder.setTableName(tableDesc.getName());
-      builder.setStats(stats.getProto());
+        CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
+        builder.setTableName(tableDesc.getName());
+        builder.setStats(stats.getProto());
 
-      catalog.updateTableStats(builder.build());
+        catalog.updateTableStats(builder.build());
 
-      responseBuilder.setTableDesc(tableDesc.getProto());
-    } else {
-      TableStats stats = new TableStats();
-      long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
-      stats.setNumBytes(volume);
-      stats.setNumRows(1);
-
-      // Empty TableDesc
-      List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
-      CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
-          .setTableName(nodeUniqName)
-          .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType("CSV").build())
-          .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
-          .setStats(stats.getProto())
-          .build();
-
-      responseBuilder.setTableDesc(tableDescProto);
-    }
+        responseBuilder.setTableDesc(tableDesc.getProto());
+      } else {
+        TableStats stats = new TableStats();
+        long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
+        stats.setNumBytes(volume);
+        stats.setNumRows(1);
+
+        // Empty TableDesc
+        List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
+        CatalogProtos.TableDescProto tableDescProto = CatalogProtos.TableDescProto.newBuilder()
+            .setTableName(nodeUniqName)
+            .setMeta(CatalogProtos.TableProto.newBuilder().setStoreType("CSV").build())
+            .setSchema(CatalogProtos.SchemaProto.newBuilder().addAllFields(columns).build())
+            .setStats(stats.getProto())
+            .build();
+
+        responseBuilder.setTableDesc(tableDescProto);
+      }
 
-    // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
-    responseBuilder.setMaxRowNum(-1);
-    responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
-    responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+      // If queryId == NULL_QUERY_ID and MaxRowNum == -1, TajoCli prints only number of inserted rows.
+      responseBuilder.setMaxRowNum(-1);
+      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
+    } catch (Throwable t) {
+      throw new RuntimeException(t);
+    }
   }
 
   public void executeDistributedQuery(QueryContext queryContext, Session session,
@@ -436,14 +442,17 @@ public class QueryExecutor {
                                       SubmitQueryResponse.Builder responseBuilder) throws Exception {
     LogicalRootNode rootNode = plan.getRootBlock().getRoot();
 
-    String storeType = PlannerUtil.getStoreType(plan);
-    if (storeType != null) {
-      Tablespace sm = TableSpaceManager.getStorageManager(context.getConf(), storeType);
-      StorageProperty storageProperty = sm.getStorageProperty();
-      if (!storageProperty.isSupportsInsertInto()) {
+    TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, plan.getRootBlock().getRoot());
+    if (tableDesc != null) {
+
+      Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+      StorageProperty storageProperty = space.getProperty();
+
+      if (!storageProperty.isInsertable()) {
         throw new VerifyException("Inserting into non-file storage is not supported.");
       }
-      sm.beforeInsertOrCATS(rootNode.getChild());
+
+      space.prepareTable(rootNode.getChild());
     }
     context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
     hookManager.doHooks(queryContext, plan);
@@ -471,28 +480,15 @@ public class QueryExecutor {
     }
   }
 
-  public static MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
+  public MasterPlan compileMasterPlan(LogicalPlan plan, QueryContext context, GlobalPlanner planner)
       throws Exception {
 
-    String storeType = PlannerUtil.getStoreType(plan);
-    if (storeType != null) {
-      Tablespace sm = TableSpaceManager.getStorageManager(planner.getConf(), storeType);
-      StorageProperty storageProperty = sm.getStorageProperty();
-      if (storageProperty.isSortedInsert()) {
-        String tableName = PlannerUtil.getStoreTableName(plan);
-        LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-        TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
-        if (tableDesc == null) {
-          throw new VerifyException("Can't get table meta data from catalog: " + tableName);
-        }
-        List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
-            context, tableDesc);
-        if (storageSpecifiedRewriteRules != null) {
-          for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
-            eachRule.rewrite(context, plan);
-          }
-        }
-      }
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+    TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
+
+    if (tableDesc != null) {
+      Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+      space.rewritePlan(context, plan);
     }
 
     MasterPlan masterPlan = new MasterPlan(QueryIdFactory.NULL_QUERY_ID, context, plan);