You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/25 10:08:59 UTC

[07/15] tajo git commit: TAJO-1616: Implement TablespaceManager to load Tablespaces.

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
index 5e3d0b6..d490001 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
@@ -19,13 +19,13 @@
 package org.apache.tajo.master.exec.prehook;
 
 import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.CreateTableNode;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.Tablespace;
 
 public class CreateTableHook implements DistributedQueryHook {
 
@@ -43,8 +43,10 @@ public class CreateTableHook implements DistributedQueryHook {
     String databaseName = splitted[0];
     String tableName = splitted[1];
     queryContext.setOutputTable(tableName);
-    queryContext.setOutputPath(
-        StorageUtil.concatPath(TajoConf.getWarehouseDir(queryContext.getConf()), databaseName, tableName));
+
+    // set the final output table uri
+    queryContext.setOutputPath(createTableNode.getUri());
+
     if(createTableNode.getPartitionMethod() != null) {
       queryContext.setPartitionMethod(createTableNode.getPartitionMethod());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
index 3dba176..f403092 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
@@ -37,7 +37,7 @@ public class DistributedQueryHookManager {
         try {
           hook.hook(queryContext, plan);
         } catch (Throwable t) {
-          t.printStackTrace();
+          throw new RuntimeException(t);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
index 14c4d8d..c309f57 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
@@ -39,23 +39,21 @@ public class InsertIntoHook implements DistributedQueryHook {
 
     // Set QueryContext settings, such as output table name and output path.
     // It also remove data files if overwrite is true.
-    Path outputPath;
     if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
       queryContext.setOutputTable(insertNode.getTableName());
-      queryContext.setOutputPath(insertNode.getPath());
       if (insertNode.hasPartition()) {
         queryContext.setPartitionMethod(insertNode.getPartitionMethod());
       }
     } else { // INSERT INTO LOCATION ...
       // When INSERT INTO LOCATION, must not set output table.
-      outputPath = insertNode.getPath();
       queryContext.setFileOutput();
-      queryContext.setOutputPath(outputPath);
     }
 
+    // Set the final output table uri
+    queryContext.setOutputPath(insertNode.getUri());
+
     if (insertNode.isOverwrite()) {
       queryContext.setOutputOverwrite();
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 23808b5..4fef02c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.state.*;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryVars;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
@@ -48,6 +49,7 @@ import org.apache.tajo.master.event.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.history.QueryHistory;
 import org.apache.tajo.util.history.StageHistory;
@@ -427,40 +429,59 @@ public class Query implements EventHandler<QueryEvent> {
       } else {
         finalState = QueryState.QUERY_ERROR;
       }
+
+      // When a query is failed
       if (finalState != QueryState.QUERY_SUCCEEDED) {
         Stage lastStage = query.getStage(stageEvent.getExecutionBlockId());
-        if (lastStage != null && lastStage.getTableMeta() != null) {
-          String storeType = lastStage.getTableMeta().getStoreType();
-          if (storeType != null) {
-            LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
-            try {
-              TableSpaceManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
-            } catch (IOException e) {
-              LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
-            }
-          }
-        }
+        handleQueryFailure(query, lastStage);
       }
+
       query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
       query.setFinishTime();
 
       return finalState;
     }
 
+    // handle query failures
+    private void handleQueryFailure(Query query, Stage lastStage) {
+      QueryContext context = query.context.getQueryContext();
+
+      if (lastStage != null && context.hasOutputTableUri()) {
+        Tablespace space = TableSpaceManager.get(context.getOutputTableUri()).get();
+        try {
+          LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+          space.rollbackTable(rootNode.getChild());
+        } catch (IOException e) {
+          LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+        }
+      }
+    }
+
     private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
       Stage lastStage = query.getStage(event.getExecutionBlockId());
-      String storeType = lastStage.getTableMeta().getStoreType();
+
       try {
+
         LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
         CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
         TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
 
-        Path finalOutputDir = TableSpaceManager.getStorageManager(query.systemConf, storeType)
-            .commitOutputData(query.context.getQueryContext(),
-                lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
+        QueryContext queryContext = query.context.getQueryContext();
+
+        // If there is not tabledesc, it is a select query without insert or ctas.
+        // In this case, we should use default tablespace.
+        Tablespace space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+
+        Path finalOutputDir = space.commitTable(
+            query.context.getQueryContext(),
+            lastStage.getId(),
+            lastStage.getMasterPlan().getLogicalPlan(),
+            lastStage.getSchema(),
+            tableDesc);
 
         QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
         hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
+
       } catch (Exception e) {
         query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
         return QueryState.QUERY_ERROR;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 2809a70..84f2eac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.querymaster;
 
+import com.google.common.base.Optional;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -51,14 +52,9 @@ import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.verifier.VerifyException;
 import org.apache.tajo.session.Session;
-import org.apache.tajo.storage.Tablespace;
-import org.apache.tajo.storage.StorageProperty;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.util.metrics.TajoMetrics;
 import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
 import org.apache.tajo.worker.AbstractResourceAllocator;
@@ -308,43 +304,28 @@ public class QueryMasterTask extends CompositeService {
   }
 
   public synchronized void startQuery() {
-    Tablespace sm = null;
     LogicalPlan plan = null;
+    Tablespace space = null;
     try {
       if (query != null) {
         LOG.warn("Query already started");
         return;
       }
+
+
       CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
-      LogicalPlanner planner = new LogicalPlanner(catalog);
+      LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
       LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
       Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
       jsonExpr = null; // remove the possible OOM
-      plan = planner.createPlan(queryContext, expr);
-
-      String storeType = PlannerUtil.getStoreType(plan);
-      if (storeType != null) {
-        sm = TableSpaceManager.getStorageManager(systemConf, storeType);
-        StorageProperty storageProperty = sm.getStorageProperty();
-        if (storageProperty.isSortedInsert()) {
-          String tableName = PlannerUtil.getStoreTableName(plan);
-          LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-          TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
-          if (tableDesc == null) {
-            throw new VerifyException("Can't get table meta data from catalog: " + tableName);
-          }
-          List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
-              getQueryTaskContext().getQueryContext(), tableDesc);
-          if (storageSpecifiedRewriteRules != null) {
-            for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
-              optimizer.addRuleAfterToJoinOpt(eachRule);
-            }
-          }
-        }
-      }
 
+      plan = planner.createPlan(queryContext, expr);
       optimizer.optimize(queryContext, plan);
 
+      // when a given uri is null, TableSpaceManager.get will return the default tablespace.
+      space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+      space.rewritePlan(queryContext, plan);
+
       for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
         LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
         if (scanNodes != null) {
@@ -374,10 +355,10 @@ public class QueryMasterTask extends CompositeService {
       LOG.error(t.getMessage(), t);
       initError = t;
 
-      if (plan != null && sm != null) {
+      if (plan != null && space != null) {
         LogicalRootNode rootNode = plan.getRootBlock().getRoot();
         try {
-          sm.rollbackOutputCommit(rootNode.getChild());
+          space.rollbackTable(rootNode.getChild());
         } catch (IOException e) {
           LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
         }
@@ -422,16 +403,27 @@ public class QueryMasterTask extends CompositeService {
     // Create Output Directory
     ////////////////////////////////////////////
 
-    String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, "");
-    if (context.isCreateTable() || context.isInsert()) {
-      if (outputPath == null || outputPath.isEmpty()) {
-        // hbase
-        stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+    String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, "");
+
+    // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO
+    // So, this query results won't be materialized as a part of a table.
+    // The result will be temporarily written in the staging directory.
+    if (outputPath.isEmpty()) {
+      // for temporarily written in the storage directory
+      stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+    } else {
+      Optional<Tablespace> spaceResult = TableSpaceManager.get(outputPath);
+      if (!spaceResult.isPresent()) {
+        throw new IOException("No registered Tablespace for " + outputPath);
+      }
+
+      Tablespace space = spaceResult.get();
+      if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation
+        // If this space allows move operation, the staging directory will be underneath the final output table uri.
+        stagingDir = StorageUtil.concatPath(context.getOutputTableUri().toString(), TMP_STAGING_DIR_PREFIX, queryId);
       } else {
-        stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
+        stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
       }
-    } else {
-      stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
     }
 
     // initializ

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 2c3e9e2..5b8f24a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -36,7 +36,6 @@ import org.apache.tajo.engine.planner.UniformRangePartition;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanRewriteUtil;
 import org.apache.tajo.engine.utils.TupleUtil;
@@ -83,19 +82,14 @@ public class Repartitioner {
     QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext();
 
     ScanNode[] scans = execBlock.getScanNodes();
-
-    Path tablePath;
     Fragment[] fragments = new Fragment[scans.length];
     long[] stats = new long[scans.length];
 
     // initialize variables from the child operators
     for (int i = 0; i < scans.length; i++) {
       TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
-      if (tableDesc == null) { // if it is a real table stored on storage
-        FileTablespace storageManager =
-            (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf());
 
-        tablePath = storageManager.getTablePath(scans[i].getTableName());
+      if (tableDesc == null) { // if it is a real table stored on storage
         if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) {
           for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry: execBlock.getUnionScanMap().entrySet()) {
             ExecutionBlockId originScanEbId = unionScanEntry.getKey();
@@ -105,25 +99,29 @@ public class Repartitioner {
           ExecutionBlockId scanEBId = TajoIdUtils.createExecutionBlockId(scans[i].getTableName());
           stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes();
         }
-        fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+
+        // TODO - We should remove dummy flagment usages
+        fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path("/dummy"), 0, 0,
+            new String[]{UNKNOWN_HOST});
+
       } else {
+
         try {
           stats[i] = GlobalPlanRewriteUtil.computeDescendentVolume(scans[i]);
         } catch (PlanningException e) {
           throw new IOException(e);
         }
 
-        Tablespace tablespace =
-            TableSpaceManager.getStorageManager(stage.getContext().getConf(), tableDesc.getMeta().getStoreType());
-
         // if table has no data, tablespace will return empty FileFragment.
         // So, we need to handle FileFragment by its size.
         // If we don't check its size, it can cause IndexOutOfBoundsException.
-        List<Fragment> fileFragments = tablespace.getSplits(scans[i].getCanonicalName(), tableDesc);
+        Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+        List<Fragment> fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc);
         if (fileFragments.size() > 0) {
           fragments[i] = fileFragments.get(0);
         } else {
-          fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path(tableDesc.getPath()), 0, 0, new String[]{UNKNOWN_HOST});
+          fragments[i] = new FileFragment(scans[i].getCanonicalName(),
+              new Path(tableDesc.getUri()), 0, 0, new String[]{UNKNOWN_HOST});
         }
       }
     }
@@ -377,26 +375,29 @@ public class Repartitioner {
     if (broadcastFragments != null) {
       //In this phase a ScanNode has a single fragment.
       //If there are more than one data files, that files should be added to fragments or partition path
+
       for (ScanNode eachScan: broadcastScans) {
+
         Path[] partitionScanPaths = null;
         TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName());
+        Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+
         if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
-          FileTablespace storageManager =
-              (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf());
 
           PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan;
           partitionScanPaths = partitionScan.getInputPaths();
           // set null to inputPaths in getFragmentsFromPartitionedTable()
-          getFragmentsFromPartitionedTable(storageManager, eachScan, tableDesc);
+          getFragmentsFromPartitionedTable((FileTablespace) space, eachScan, tableDesc);
           partitionScan.setInputPaths(partitionScanPaths);
+
         } else {
-          Tablespace tablespace = TableSpaceManager.getStorageManager(stage.getContext().getConf(),
-              tableDesc.getMeta().getStoreType());
-          Collection<Fragment> scanFragments = tablespace.getSplits(eachScan.getCanonicalName(),
+
+          Collection<Fragment> scanFragments = space.getSplits(eachScan.getCanonicalName(),
               tableDesc, eachScan);
           if (scanFragments != null) {
             rightFragments.addAll(scanFragments);
           }
+
         }
       }
     }
@@ -505,18 +506,16 @@ public class Repartitioner {
 
       Collection<Fragment> scanFragments;
       Path[] partitionScanPaths = null;
+
+      FileTablespace space = (FileTablespace) TableSpaceManager.get(desc.getUri()).get();
+
       if (scan.getType() == NodeType.PARTITIONS_SCAN) {
         PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan;
         partitionScanPaths = partitionScan.getInputPaths();
         // set null to inputPaths in getFragmentsFromPartitionedTable()
-        FileTablespace storageManager =
-            (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf());
-        scanFragments = getFragmentsFromPartitionedTable(storageManager, scan, desc);
+        scanFragments = getFragmentsFromPartitionedTable(space, scan, desc);
       } else {
-        Tablespace tablespace =
-            TableSpaceManager.getStorageManager(stage.getContext().getConf(), desc.getMeta().getStoreType());
-
-        scanFragments = tablespace.getSplits(scan.getCanonicalName(), desc, scan);
+        scanFragments = space.getSplits(scan.getCanonicalName(), desc, scan);
       }
 
       if (scanFragments != null) {
@@ -618,9 +617,6 @@ public class Repartitioner {
       throws IOException {
     ExecutionBlock execBlock = stage.getBlock();
     ScanNode scan = execBlock.getScanNodes()[0];
-    Path tablePath;
-    tablePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf()))
-        .getTablePath(scan.getTableName());
 
     ExecutionBlock sampleChildBlock = masterPlan.getChild(stage.getId(), 0);
     SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT);
@@ -648,10 +644,15 @@ public class Repartitioner {
         throw new IOException("Can't get table meta data from catalog: " +
             PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
       }
-      ranges = TableSpaceManager.getStorageManager(stage.getContext().getConf(), storeType)
-          .getInsertSortRanges(stage.getContext().getQueryContext(), tableDesc,
-              sortNode.getInSchema(), sortSpecs,
-              mergedRange);
+
+      Tablespace space = TableSpaceManager.getAnyByScheme(storeType).get();
+      ranges = space.getInsertSortRanges(
+          stage.getContext().getQueryContext(),
+          tableDesc,
+          sortNode.getInSchema(),
+          sortSpecs,
+          mergedRange);
+
       determinedTaskNum = ranges.length;
     } else {
       RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
@@ -688,7 +689,9 @@ public class Repartitioner {
       }
     }
 
-    FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+    // TODO - We should remove dummy fragment.
+    FileFragment dummyFragment = new FileFragment(scan.getTableName(), new Path("/dummy"), 0, 0,
+        new String[]{UNKNOWN_HOST});
     Stage.scheduleFragment(stage, dummyFragment);
 
     List<FetchImpl> fetches = new ArrayList<FetchImpl>();
@@ -784,11 +787,9 @@ public class Repartitioner {
                                                  int maxNum) throws IOException {
     ExecutionBlock execBlock = stage.getBlock();
     ScanNode scan = execBlock.getScanNodes()[0];
-    Path tablePath;
-    tablePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf()))
-        .getTablePath(scan.getTableName());
 
-    Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+    // TODO - We should remove dummy fragment usages
+    Fragment frag = new FileFragment(scan.getCanonicalName(), new Path("/dummy"), 0, 0, new String[]{UNKNOWN_HOST});
     List<Fragment> fragments = new ArrayList<Fragment>();
     fragments.add(frag);
     Stage.scheduleFragments(stage, fragments);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 5a0fc38..a7d605c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -60,8 +60,8 @@ import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.storage.FileTablespace;
-import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
@@ -1084,18 +1084,18 @@ public class Stage implements EventHandler<StageEvent> {
       Collection<Fragment> fragments;
       TableMeta meta = table.getMeta();
 
+      Tablespace tablespace = TableSpaceManager.get(scan.getTableDesc().getUri()).get();
+
       // Depending on scanner node's type, it creates fragments. If scan is for
       // a partitioned table, It will creates lots fragments for all partitions.
       // Otherwise, it creates at least one fragments for a table, which may
       // span a number of blocks or possibly consists of a number of files.
+      //
+      // Also, we can ensure FileTableSpace if the type of ScanNode is PARTITIONS_SCAN.
       if (scan.getType() == NodeType.PARTITIONS_SCAN) {
         // After calling this method, partition paths are removed from the physical plan.
-        FileTablespace storageManager =
-            (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf());
-        fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table);
+        fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table);
       } else {
-        Tablespace tablespace =
-            TableSpaceManager.getStorageManager(stage.getContext().getConf(), meta.getStoreType());
         fragments = tablespace.getSplits(scan.getCanonicalName(), table, scan);
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index f265e50..ae22d0d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -486,7 +486,7 @@ public class QueryExecutorServlet extends HttpServlet {
       if (resultRows <= 0) {
         resultRows = 1000;
       }
-      LOG.info("Tajo Query Result: " + desc.getPath() + "\n");
+      LOG.info("Tajo Query Result: " + desc.getUri() + "\n");
 
       int numOfColumns = rsmd.getColumnCount();
       for(int i = 0; i < numOfColumns; i++) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
index 0721ef1..0df5d4d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
@@ -160,7 +160,7 @@ public class LegacyTaskImpl implements Task {
         this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
       }
     } else {
-      Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
+      Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get())
           .getAppenderFilePath(getId(), queryContext.getStagingDir());
       LOG.info("Output File Path: " + outFilePath);
       context.setOutputPath(outFilePath);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index fbd070e..66c8e4a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -40,7 +40,6 @@ import org.apache.tajo.function.FunctionSignature;
 import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.service.ServiceTrackerException;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.service.TajoMasterInfo;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
@@ -56,7 +55,7 @@ import org.apache.tajo.rule.EvaluationFailedException;
 import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
 import org.apache.tajo.rule.SelfDiagnosisRuleSession;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.OldStorageManager;
 import org.apache.tajo.util.*;
 import org.apache.tajo.util.history.HistoryReader;
 import org.apache.tajo.util.history.HistoryWriter;
@@ -370,7 +369,7 @@ public class TajoWorker extends CompositeService {
     }
 
     try {
-      TableSpaceManager.shutdown();
+      OldStorageManager.shutdown();
     } catch (IOException ie) {
       LOG.error(ie.getMessage(), ie);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index be3960b..5974693 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -154,7 +154,7 @@ public class TaskImpl implements Task {
         this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
       }
     } else {
-      Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
+      Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get())
           .getAppenderFilePath(getId(), queryContext.getStagingDir());
       LOG.info("Output File Path: " + outFilePath);
       context.setOutputPath(outFilePath);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
index 43ec5ca..6771912 100644
--- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
@@ -30,7 +30,6 @@
 <%@ page import="java.util.Collection" %>
 <%@ page import="java.util.List" %>
 <%@ page import="java.util.Map" %>
-<%@ page import="org.apache.tajo.service.ServiceTracker" %>
 <%@ page import="java.net.InetSocketAddress" %>
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -186,7 +185,7 @@
           <div style='margin-top:10px'>
             <div style=''>Detail</div>
             <table border="1" class='border_table'>
-              <tr><td width='100'>Table path</td><td width='410'><%=tableDesc.getPath()%></td></tr>
+              <tr><td width='100'>Table path</td><td width='410'><%=tableDesc.getUri()%></td></tr>
               <tr><td>Store type</td><td><%=tableDesc.getMeta().getStoreType()%></td></tr>
               <tr><td># rows</td><td><%=(tableDesc.hasStats() ? ("" + tableDesc.getStats().getNumRows()) : "-")%></td></tr>
               <tr><td>Volume</td><td><%=(tableDesc.hasStats() ? FileUtil.humanReadableByteCount(tableDesc.getStats().getNumBytes(),true) : "-")%></td></tr>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index e0cf876..43bb6c1 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -23,20 +23,19 @@
 <%@ page import="org.apache.tajo.conf.TajoConf" %>
 <%@ page import="org.apache.tajo.ipc.QueryCoordinatorProtocol" %>
 <%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.service.ServiceTracker" %>
-<%@ page import="org.apache.tajo.service.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.master.QueryInProgress" %>
 <%@ page import="org.apache.tajo.master.rm.Worker" %>
 <%@ page import="org.apache.tajo.master.rm.WorkerState" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
+<%@ page import="org.apache.tajo.service.TajoMasterInfo" %>
+<%@ page import="org.apache.tajo.storage.TableSpaceManager" %>
+<%@ page import="org.apache.tajo.storage.Tablespace" %>
 <%@ page import="org.apache.tajo.util.NetUtils" %>
 <%@ page import="org.apache.tajo.util.TUtil" %>
 <%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="java.util.List" %>
-<%@ page import="java.util.Collection" %>
+<%@ page import="java.net.InetSocketAddress" %>
 <%@ page import="java.util.Date" %>
+<%@ page import="java.util.List" %>
 <%@ page import="java.util.Map" %>
-<%@ page import="java.net.InetSocketAddress" %>
-<%@ page import="org.apache.tajo.service.ServiceTracker" %>
 
 <%
   TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -139,7 +138,15 @@
     <tr><td width='150'>Threads:</td><td><a href='thread.jsp'>thread dump...</a></tr>
   </table>
   <hr/>
-
+  <h3>Tablespaces</h3>
+  <table width="100%" class="border_table" border="1">
+    <tr><th>Tablespace Name</th><th>URI</th><th>Handler</th></tr>
+    <% for (Tablespace space : TableSpaceManager.getAllTablespaces()) {
+      if (space.isVisible()) { %>
+    <tr><td><%=space.getName()%></td><td><%=space.getUri()%></td><td><%=space.getClass().getName()%></td></tr>
+    <% }}%>
+  </table>
+  <hr/>
   <h3>Cluster Summary</h3>
   <table width="100%" class="border_table" border="1">
     <tr><th>Type</th><th>Total</th><th>Live</th><th>Dead</th><th>Running Master</th><th>Memory Resource<br/>(used/total)</th><th>Disk Resource<br/>(used/total)</th></tr>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
index b5be9d0..ca2378b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -46,7 +46,7 @@ public class BackendTestingUtil {
 
   public static void writeTmpTable(TajoConf conf, Path tablePath)
       throws IOException {
-    FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm = TableSpaceManager.getDefault();
     Appender appender;
 
     Path filePath = new Path(tablePath, "table.csv");

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 9a92e90..57b1e18 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -945,7 +945,7 @@ public class QueryTestCaseBase {
       return null;
     }
 
-    Path path = new Path(tableDesc.getPath());
+    Path path = new Path(tableDesc.getUri());
     return getTableFileContents(path);
   }
 
@@ -955,7 +955,7 @@ public class QueryTestCaseBase {
       return null;
     }
 
-    Path path = new Path(tableDesc.getPath());
+    Path path = new Path(tableDesc.getUri());
     FileSystem fs = path.getFileSystem(conf);
 
     return listFiles(fs, path);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 9b5980b..acdae85 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.client.TajoClientUtil;
@@ -48,16 +47,18 @@ import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.querymaster.StageState;
 import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.Pair;
 import org.apache.tajo.worker.TajoWorker;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.Writer;
 import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
 import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.List;
@@ -345,10 +346,18 @@ public class TajoTestingCluster {
     LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
 
     if (!local) {
-      c.setVar(ConfVars.ROOT_DIR,
-          getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
+      c.setVar(ConfVars.ROOT_DIR, getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
     } else {
-      c.setVar(ConfVars.ROOT_DIR, testBuildDir.getAbsolutePath() + "/tajo");
+      c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo");
+    }
+
+    // Do not need for local file system
+    if (!local) {
+      FileTablespace defaultTableSpace =
+          new FileTablespace(TableSpaceManager.DEFAULT_TABLESPACE_NAME, TajoConf.getWarehouseDir(c).toUri());
+      defaultTableSpace.init(conf);
+
+      TableSpaceManager.addTableSpaceForTest(defaultTableSpace);
     }
 
     setupCatalogForTesting(c, testBuildDir);
@@ -441,13 +450,6 @@ public class TajoTestingCluster {
     }
   }
 
-  public void restartTajoCluster(int numSlaves) throws Exception {
-    tajoMaster.stop();
-    tajoMaster.start();
-
-    LOG.info("Minicluster has been restarted");
-  }
-
   public TajoMaster getMaster() {
     return this.tajoMaster;
   }
@@ -653,7 +655,14 @@ public class TajoTestingCluster {
       if (!fs.exists(rootDir)) {
         fs.mkdirs(rootDir);
       }
-      Path tablePath = new Path(rootDir, tableName);
+      Path tablePath;
+      if (CatalogUtil.isFQTableName(tableName)) {
+        Pair<String, String> name = CatalogUtil.separateQualifierAndName(tableName);
+        tablePath = new Path(rootDir, new Path(name.getFirst(), name.getSecond()));
+      } else {
+        tablePath = new Path(rootDir, tableName);
+      }
+
       fs.mkdirs(tablePath);
       if (tableDatas.length > 0) {
         int recordPerFile = tableDatas.length / numDataFiles;

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
index 54e50fc..ce951c6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
@@ -32,6 +32,7 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.client.QueryStatus;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.FileUtil;
 import org.junit.After;
 import org.junit.Before;
@@ -214,10 +215,9 @@ public class TestTajoCli {
 
     String consoleResult = new String(out.toByteArray());
 
-    FileSystem fs = FileSystem.get(testBase.getTestingCluster().getConfiguration());
     if (!cluster.isHiveCatalogStoreRunning()) {
       assertOutputResult(resultFileName, consoleResult, new String[]{"${table.path}"},
-        new String[]{fs.getUri() + "/tajo/warehouse/default/" + tableName});
+        new String[]{TableSpaceManager.getDefault().getTableUri("default", tableName).toString()});
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 765a084..73b97fa 100644
--- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -324,7 +324,7 @@ public class TestTajoClient {
     client.updateQuery(sql);
     assertTrue(client.existTable(tableName));
 
-    Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+    Path tablePath = new Path(client.getTableDesc(tableName).getUri());
     FileSystem hdfs = tablePath.getFileSystem(conf);
     assertTrue(hdfs.exists(tablePath));
 
@@ -345,7 +345,7 @@ public class TestTajoClient {
     client.updateQuery(sql);
     assertTrue(client.existTable(tableName));
 
-    Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+    Path tablePath = new Path(client.getTableDesc(tableName).getUri());
     FileSystem hdfs = tablePath.getFileSystem(conf);
     assertTrue(hdfs.exists(tablePath));
 
@@ -422,7 +422,7 @@ public class TestTajoClient {
     client.updateQuery(sql);
     assertTrue(client.existTable(tableName));
 
-    Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+    Path tablePath = new Path(client.getTableDesc(tableName).getUri());
     FileSystem hdfs = tablePath.getFileSystem(conf);
     assertTrue(hdfs.exists(tablePath));
 
@@ -446,7 +446,7 @@ public class TestTajoClient {
     client.updateQuery(sql);
     assertTrue(client.existTable(tableName));
 
-    Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+    Path tablePath = new Path(client.getTableDesc(tableName).getUri());
     FileSystem hdfs = tablePath.getFileSystem(conf);
     assertTrue(hdfs.exists(tablePath));
 
@@ -470,7 +470,7 @@ public class TestTajoClient {
     client.updateQuery(sql);
     assertTrue(client.existTable(tableName));
 
-    Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+    Path tablePath = new Path(client.getTableDesc(tableName).getUri());
     FileSystem hdfs = tablePath.getFileSystem(conf);
     assertTrue(hdfs.exists(tablePath));
 
@@ -495,7 +495,7 @@ public class TestTajoClient {
     client.updateQuery(sql);
     assertTrue(client.existTable(tableName));
 
-    Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+    Path tablePath = new Path(client.getTableDesc(tableName).getUri());
     FileSystem hdfs = tablePath.getFileSystem(conf);
     assertTrue(hdfs.exists(tablePath));
 
@@ -521,7 +521,7 @@ public class TestTajoClient {
     client.updateQuery(sql);
     assertTrue(client.existTable(tableName));
 
-    Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+    Path tablePath = new Path(client.getTableDesc(tableName).getUri());
     FileSystem hdfs = tablePath.getFileSystem(conf);
     assertTrue(hdfs.exists(tablePath));
 
@@ -574,7 +574,7 @@ public class TestTajoClient {
     client.updateQuery(sql);
     assertTrue(client.existTable(tableName));
 
-    Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+    Path tablePath = new Path(client.getTableDesc(tableName).getUri());
     FileSystem hdfs = tablePath.getFileSystem(conf);
     assertTrue(hdfs.exists(tablePath));
 
@@ -704,7 +704,7 @@ public class TestTajoClient {
 
     assertEquals(resultDesc.getMeta().getOption(StorageConstants.TEXT_NULL), "\\\\T");
 
-    Path path = new Path(resultDesc.getPath());
+    Path path = new Path(resultDesc.getUri());
     FileSystem fs = path.getFileSystem(tajoConf);
 
     FileStatus[] files = fs.listStatus(path);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 66aedc0..328f883 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -50,6 +50,7 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.VerificationState;
 import org.apache.tajo.storage.LazyTuple;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.BytesUtils;
@@ -103,7 +104,7 @@ public class ExprTestBase {
 
     analyzer = new SQLAnalyzer();
     preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat);
-    planner = new LogicalPlanner(cat);
+    planner = new LogicalPlanner(cat, TableSpaceManager.getInstance());
     optimizer = new LogicalOptimizer(util.getConfiguration());
     annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 4bfe640..80f3459 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -45,6 +45,7 @@ import org.apache.tajo.plan.function.GeneralFunction;
 import org.apache.tajo.plan.logical.GroupbyNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.nameresolver.NameResolvingMode;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.AfterClass;
@@ -116,7 +117,7 @@ public class TestEvalTreeUtil {
     catalog.createFunction(funcMeta);
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
+    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
 
     String[] QUERIES = {
         "select name, score, age from people where score > 30", // 0

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index a408fd6..9aa7ddf 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -24,7 +24,6 @@ import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.engine.function.FunctionLoader;
 import org.apache.tajo.engine.function.builtin.SumInt;
@@ -35,6 +34,7 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.junit.AfterClass;
@@ -103,7 +103,7 @@ public class TestLogicalOptimizer {
 
     catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
+    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
     optimizer = new LogicalOptimizer(util.getConfiguration());
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index cee1593..3cee816 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner;
 
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.graph.SimpleDirectedGraph;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
@@ -39,7 +40,7 @@ public class TestLogicalPlan {
   public static void setup() throws Exception {
     util = new TajoTestingCluster();
     util.startCatalogCluster();
-    planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog());
+    planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog(), TableSpaceManager.getInstance());
   }
 
   public static void tearDown() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 1feea4c..351a6af 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryVars;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
@@ -29,7 +30,6 @@ import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.benchmark.TPCH;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.engine.function.FunctionLoader;
@@ -42,6 +42,7 @@ import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.KeyValueSet;
@@ -130,7 +131,7 @@ public class TestLogicalPlanner {
 
     catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
+    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
   }
 
   @AfterClass
@@ -155,6 +156,13 @@ public class TestLogicalPlanner {
       "select length(name), length(deptname), *, empid+10 from employee where empId > 500", // 13
   };
 
+  private static QueryContext createQueryContext() {
+    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    qc.put(QueryVars.DEFAULT_SPACE_URI, "file:/");
+    qc.put(QueryVars.DEFAULT_SPACE_ROOT_URI, "file:/");
+    return qc;
+  }
+
   public static final void testCloneLogicalNode(LogicalNode n1) throws CloneNotSupportedException {
     LogicalNode copy = (LogicalNode) n1.clone();
     assertTrue(n1.deepEquals(copy));
@@ -162,7 +170,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testSingleRelation() throws CloneNotSupportedException, PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(QUERIES[0]);
     LogicalPlan planNode = planner.createPlan(qc, expr);
@@ -196,7 +204,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testImplicityJoinPlan() throws CloneNotSupportedException, PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     // two relations
     Expr expr = sqlAnalyzer.parse(QUERIES[1]);
@@ -285,7 +293,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testNaturalJoinPlan() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
     // two relations
     Expr context = sqlAnalyzer.parse(JOINS[0]);
     LogicalNode plan = planner.createPlan(qc, context).getRootBlock().getRoot();
@@ -317,7 +325,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testInnerJoinPlan() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
     // two relations
     Expr expr = sqlAnalyzer.parse(JOINS[1]);
     LogicalPlan plan = planner.createPlan(qc, expr);
@@ -350,7 +358,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testOuterJoinPlan() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     // two relations
     Expr expr = sqlAnalyzer.parse(JOINS[2]);
@@ -385,7 +393,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testGroupby() throws CloneNotSupportedException, PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     // without 'having clause'
     Expr context = sqlAnalyzer.parse(QUERIES[7]);
@@ -429,7 +437,7 @@ public class TestLogicalPlanner {
   public final void testMultipleJoin() throws IOException, PlanningException {
     Expr expr = sqlAnalyzer.parse(
         FileUtil.readTextFile(new File("src/test/resources/queries/TestJoinQuery/testTPCHQ2Join.sql")));
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
     LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
     testJsonSerDerObject(plan);
     Schema expected = tpch.getOutSchema("q2");
@@ -488,7 +496,7 @@ public class TestLogicalPlanner {
     Expr expr = sqlAnalyzer.parse(
         FileUtil.readTextFile(new File
             ("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual1.sql")));
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     LogicalPlan plan = planner.createPlan(qc, expr);
     LogicalNode node = plan.getRootBlock().getRoot();
@@ -518,7 +526,7 @@ public class TestLogicalPlanner {
     }
 
     for (Map.Entry<BinaryEval, Boolean> entry : qualMap.entrySet()) {
-      if (!entry.getValue().booleanValue()) {
+      if (!entry.getValue()) {
         Preconditions.checkArgument(false,
             "JoinQual not found. -> required JoinQual:" + entry.getKey().toJson());
       }
@@ -530,7 +538,7 @@ public class TestLogicalPlanner {
     Expr expr = sqlAnalyzer.parse(
         FileUtil.readTextFile(new File
             ("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual2.sql")));
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     LogicalPlan plan = planner.createPlan(qc,expr);
     LogicalNode node = plan.getRootBlock().getRoot();
@@ -559,7 +567,7 @@ public class TestLogicalPlanner {
     }
 
     for (Map.Entry<BinaryEval, Boolean> entry : qualMap.entrySet()) {
-      if (!entry.getValue().booleanValue()) {
+      if (!entry.getValue()) {
         Preconditions.checkArgument(false,
             "SelectionQual not found. -> required JoinQual:" + entry.getKey().toJson());
       }
@@ -571,7 +579,7 @@ public class TestLogicalPlanner {
     Expr expr = sqlAnalyzer.parse(
         FileUtil.readTextFile(new File
             ("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual3.sql")));
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     LogicalPlan plan = planner.createPlan(qc, expr);
     LogicalNode node = plan.getRootBlock().getRoot();
@@ -605,7 +613,7 @@ public class TestLogicalPlanner {
     }
 
     for (Map.Entry<BinaryEval, Boolean> entry : qualMap.entrySet()) {
-      if (!entry.getValue().booleanValue()) {
+      if (!entry.getValue()) {
         Preconditions.checkArgument(false,
             "ScanQual not found. -> required JoinQual:" + entry.getKey().toJson());
       }
@@ -618,7 +626,7 @@ public class TestLogicalPlanner {
     Expr expr = sqlAnalyzer.parse(
         FileUtil.readTextFile(new File
             ("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual4.sql")));
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     LogicalPlan plan = planner.createPlan(qc, expr);
     LogicalNode node = plan.getRootBlock().getRoot();
@@ -675,14 +683,14 @@ public class TestLogicalPlanner {
 
 
     for (Map.Entry<BinaryEval, Boolean> entry : joinQualMap.entrySet()) {
-      if (!entry.getValue().booleanValue()) {
+      if (!entry.getValue()) {
         Preconditions.checkArgument(false,
             "JoinQual not found. -> required JoinQual:" + entry.getKey().toJson());
       }
     }
 
     for (Map.Entry<BinaryEval, Boolean> entry : scanMap.entrySet()) {
-      if (!entry.getValue().booleanValue()) {
+      if (!entry.getValue()) {
         Preconditions.checkArgument(false,
             "ScanQual not found. -> required JoinQual:" + entry.getKey().toJson());
       }
@@ -709,7 +717,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testStoreTable() throws CloneNotSupportedException, PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr context = sqlAnalyzer.parse(QUERIES[8]);
 
@@ -727,7 +735,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testOrderBy() throws CloneNotSupportedException, PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(QUERIES[4]);
 
@@ -757,7 +765,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testLimit() throws CloneNotSupportedException, PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(QUERIES[12]);
 
@@ -779,7 +787,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testSPJPush() throws CloneNotSupportedException, PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(QUERIES[5]);
     LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -801,7 +809,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testSPJ() throws CloneNotSupportedException, PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(QUERIES[6]);
     LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -811,7 +819,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testJson() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
 	  Expr expr = sqlAnalyzer.parse(QUERIES[9]);
 	  LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -833,7 +841,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testVisitor() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     // two relations
     Expr expr = sqlAnalyzer.parse(QUERIES[1]);
@@ -860,7 +868,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testExprNode() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(QUERIES[10]);
     LogicalPlan rootNode = planner.createPlan(qc, expr);
@@ -882,7 +890,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testAsterisk() throws CloneNotSupportedException, PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(QUERIES[13]);
     LogicalPlan planNode = planner.createPlan(qc, expr);
@@ -912,7 +920,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testAlias1() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(ALIAS[0]);
     LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -940,7 +948,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testAlias2() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(ALIAS[1]);
     LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -961,7 +969,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testCreateTableDef() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(CREATE_TABLE[0]);
     LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -980,7 +988,7 @@ public class TestLogicalPlanner {
     assertEquals("score", def.getColumn(3).getSimpleName());
     assertEquals(Type.FLOAT4, def.getColumn(3).getDataType().getType());
     assertTrue("CSV".equalsIgnoreCase(createTable.getStorageType()));
-    assertEquals("/tmp/data", createTable.getPath().toString());
+    assertEquals("/tmp/data", createTable.getUri().toString());
     assertTrue(createTable.hasOptions());
     assertEquals("|", createTable.getOptions().get("csv.delimiter"));
   }
@@ -1047,7 +1055,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testSetPlan() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(setStatements[0]);
     LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -1068,7 +1076,7 @@ public class TestLogicalPlanner {
 
   @Test
   public void testSetQualifier() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr context = sqlAnalyzer.parse(setQualifiers[0]);
     LogicalNode plan = planner.createPlan(qc, context).getRootBlock().getRoot();
@@ -1121,7 +1129,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testInsertInto0() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(insertStatements[0]);
     LogicalPlan plan = planner.createPlan(qc, expr);
@@ -1134,7 +1142,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testInsertInto1() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(insertStatements[1]);
     LogicalPlan plan = planner.createPlan(qc, expr);
@@ -1146,7 +1154,7 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testInsertInto2() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(insertStatements[2]);
     LogicalPlan plan = planner.createPlan(qc, expr);
@@ -1161,19 +1169,19 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testInsertInto3() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(insertStatements[3]);
     LogicalPlan plan = planner.createPlan(qc, expr);
     assertEquals(1, plan.getQueryBlocks().size());
     InsertNode insertNode = getInsertNode(plan);
     assertFalse(insertNode.isOverwrite());
-    assertTrue(insertNode.hasPath());
+    assertTrue(insertNode.hasUri());
   }
 
   @Test
   public final void testInsertInto4() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(insertStatements[4]);
     LogicalPlan plan = planner.createPlan(qc, expr);
@@ -1189,19 +1197,19 @@ public class TestLogicalPlanner {
 
   @Test
   public final void testInsertInto5() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(insertStatements[5]);
     LogicalPlan plan = planner.createPlan(qc, expr);
     assertEquals(1, plan.getQueryBlocks().size());
     InsertNode insertNode = getInsertNode(plan);
     assertTrue(insertNode.isOverwrite());
-    assertTrue(insertNode.hasPath());
+    assertTrue(insertNode.hasUri());
   }
 
   @Test
   public final void testInsertInto6() throws PlanningException {
-    QueryContext qc = new QueryContext(util.getConfiguration(), session);
+    QueryContext qc = createQueryContext();
 
     Expr expr = sqlAnalyzer.parse(insertStatements[6]);
     LogicalPlan plan = planner.createPlan(qc, expr);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 0082800..d62eed2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -39,6 +39,7 @@ import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
@@ -108,7 +109,7 @@ public class TestPlannerUtil {
 
     catalog.createFunction(funcDesc);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
+    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
   }
 
   @AfterClass
@@ -337,7 +338,7 @@ public class TestPlannerUtil {
 
     TableDesc tableDesc = new TableDesc();
     tableDesc.setName("Test");
-    tableDesc.setPath(path.toUri());
+    tableDesc.setUri(path.toUri());
 
     FileSystem fs = path.getFileSystem(util.getConfiguration());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index e7e4f7d..2464fb1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -86,7 +86,7 @@ public class TestBNLJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
         .getAppender(employeeMeta, schema, employeePath);
     appender.init();
     VTuple tuple = new VTuple(schema.size());
@@ -108,8 +108,7 @@ public class TestBNLJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
-        .getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
@@ -125,7 +124,7 @@ public class TestBNLJoinExec {
     people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
     catalog.createTable(people);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
+    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
   }
 
   @After
@@ -150,10 +149,10 @@ public class TestBNLJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
 
     FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(),
-        new Path(employee.getPath()),
+        new Path(employee.getUri()),
         Integer.MAX_VALUE);
     FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(),
-        new Path(people.getPath()),
+        new Path(people.getUri()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testBNLCrossJoin");
@@ -183,9 +182,9 @@ public class TestBNLJoinExec {
         context).getRootBlock().getRoot();
 
     FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(),
-        new Path(employee.getPath()), Integer.MAX_VALUE);
+        new Path(employee.getUri()), Integer.MAX_VALUE);
     FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(),
-        new Path(people.getPath()), Integer.MAX_VALUE);
+        new Path(people.getUri()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 5a7ba6a..96a1f36 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -90,7 +90,7 @@ public class TestBSTIndexExec {
     Path workDir = CommonTestingUtil.getTestDir();
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
-    sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
+    sm = TableSpaceManager.getLocalFs();
 
     idxPath = new Path(workDir, "test.idx");
 
@@ -144,11 +144,11 @@ public class TestBSTIndexExec {
 
     TableDesc desc = new TableDesc(
         CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta,
-        sm.getTablePath("employee").toUri());
+        sm.getTableUri(TajoConstants.DEFAULT_DATABASE_NAME, "employee"));
     catalog.createTable(desc);
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
+    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
     optimizer = new LogicalOptimizer(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 8e2f234..d94d3f6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -82,7 +82,7 @@ public class TestExternalSortExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
         .getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
     appender.init();
@@ -104,7 +104,7 @@ public class TestExternalSortExec {
     employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri());
     catalog.createTable(employee);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
+    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
   }
 
   @After
@@ -120,7 +120,7 @@ public class TestExternalSortExec {
   @Test
   public final void testNext() throws IOException, PlanningException {
     FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
-        new Path(employee.getPath()), Integer.MAX_VALUE);
+        new Path(employee.getUri()), Integer.MAX_VALUE);
     Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
         LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index fb6fd02..21a101a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -104,8 +104,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
-        .getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     VTuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -134,8 +133,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
-        .getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     VTuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -174,8 +172,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
-        .getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     VTuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -227,7 +224,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs())
         .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
 
@@ -237,7 +234,7 @@ public class TestFullOuterHashJoinExec {
     catalog.createTable(phone3);
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog);
+    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
   }
@@ -266,9 +263,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+    FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getUri()),
         Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
 
@@ -305,9 +302,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+    FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()),
         Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
@@ -343,9 +340,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
         Integer.MAX_VALUE);
-    FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+    FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
 
@@ -382,9 +379,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
         Integer.MAX_VALUE);
-    FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+    FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getUri()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);