You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/25 16:36:45 UTC

[52/57] [abbrv] tajo git commit: TAJO-1613: Rename StorageManager to Tablespace.

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 8c72d39..7f92667 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -80,7 +80,7 @@ public class TestPhysicalPlanner {
   private static SQLAnalyzer analyzer;
   private static LogicalPlanner planner;
   private static LogicalOptimizer optimizer;
-  private static FileStorageManager sm;
+  private static FileTablespace sm;
   private static Path testDir;
   private static Session session = LocalTajoTestingUtility.createDummySession();
   private static QueryContext defaultContext;
@@ -98,7 +98,7 @@ public class TestPhysicalPlanner {
     util.startCatalogCluster();
     conf = util.getConfiguration();
     testDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPhysicalPlanner");
-    sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
+    sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
     catalog = util.getMiniCatalogCluster().getCatalog();
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
@@ -180,7 +180,7 @@ public class TestPhysicalPlanner {
 
     Schema scoreSchmea = score.getSchema();
     TableMeta scoreLargeMeta = CatalogUtil.newTableMeta("RAW", new KeyValueSet());
-    Appender appender =  ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender =  ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(scoreLargeMeta, scoreSchmea, scoreLargePath);
     appender.enableStats();
     appender.init();
@@ -237,7 +237,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testCreateScanPlan() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateScanPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -268,7 +268,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testCreateScanWithFilterPlan() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateScanWithFilterPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -297,7 +297,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testGroupByPlan() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -328,7 +328,7 @@ public class TestPhysicalPlanner {
   @Test
   public final void testHashGroupByPlanWithALLField() throws IOException, PlanningException {
     // TODO - currently, this query does not use hash-based group operator.
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + 
         "/testHashGroupByPlanWithALLField");
@@ -358,7 +358,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testSortGroupByPlan() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortGroupByPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -421,7 +421,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testStorePlan() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -442,7 +442,7 @@ public class TestPhysicalPlanner {
     exec.next();
     exec.close();
 
-    Scanner scanner =  ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Scanner scanner =  ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
@@ -467,7 +467,7 @@ public class TestPhysicalPlanner {
     TableStats stats = largeScore.getStats();
     assertTrue("Checking meaningfulness of test", stats.getNumBytes() > StorageUnit.MB);
 
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score_large", largeScore.getMeta(),
         new Path(largeScore.getPath()), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlanWithMaxOutputFileSize");
 
@@ -502,7 +502,7 @@ public class TestPhysicalPlanner {
     // checking the file contents
     long totalNum = 0;
     for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) {
-      Scanner scanner =  ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getFileScanner(
+      Scanner scanner =  ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getFileScanner(
           CatalogUtil.newTableMeta("CSV"),
           rootNode.getOutSchema(),
           status.getPath());
@@ -518,7 +518,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testStorePlanWithRCFile() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlanWithRCFile");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -539,7 +539,7 @@ public class TestPhysicalPlanner {
     exec.next();
     exec.close();
 
-    Scanner scanner = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getFileScanner(
+    Scanner scanner = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getFileScanner(
         outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
@@ -559,7 +559,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testEnforceForDefaultColumnPartitionStorePlan() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -586,7 +586,7 @@ public class TestPhysicalPlanner {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.HASH_PARTITION);
 
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -610,7 +610,7 @@ public class TestPhysicalPlanner {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.SORT_PARTITION);
 
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -626,7 +626,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testPartitionedStorePlan() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan);
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), id, new FileFragment[] { frags[0] },
@@ -695,7 +695,7 @@ public class TestPhysicalPlanner {
   public final void testPartitionedStorePlanWithMaxFileSize() throws IOException, PlanningException {
 
     // Preparing working dir and input fragments
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score_large", largeScore.getMeta(),
         new Path(largeScore.getPath()), Integer.MAX_VALUE);
     TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testPartitionedStorePlanWithMaxFileSize");
@@ -759,7 +759,7 @@ public class TestPhysicalPlanner {
   @Test
   public final void testPartitionedStorePlanWithEmptyGroupingSet()
       throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     TaskAttemptId id = LocalTajoTestingUtility.newTaskAttemptId(masterPlan);
 
@@ -826,7 +826,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testAggregationFunction() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testAggregationFunction");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -857,7 +857,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testCountFunction() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCountFunction");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -885,7 +885,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testGroupByWithNullValue() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByWithNullValue");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -910,7 +910,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testUnionPlan() throws IOException, PlanningException, CloneNotSupportedException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testUnionPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -975,7 +975,7 @@ public class TestPhysicalPlanner {
 
   //@Test
   public final void testCreateIndex() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testCreateIndex");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -1002,7 +1002,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testDuplicateEliminate() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(),
         new Path(score.getPath()), Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testDuplicateEliminate");
@@ -1036,7 +1036,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testSortEnforcer() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortEnforcer");
@@ -1087,7 +1087,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testGroupByEnforcer() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testGroupByEnforcer");
     Expr context = analyzer.parse(QUERIES[7]);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index 94ebe51..600f388 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -86,7 +86,7 @@ public class TestProgressExternalSortExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("RAW");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
     appender.init();
@@ -134,7 +134,7 @@ public class TestProgressExternalSortExec {
   }
 
   private void testProgress(int sortBufferBytesNum) throws Exception {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index 3455cb3..79db3bf 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -100,7 +100,7 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender1 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
@@ -130,7 +130,7 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender2 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
@@ -170,7 +170,7 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender3 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
@@ -229,9 +229,9 @@ public class TestRightOuterHashJoinExec {
 
   @Test
   public final void testRightOuter_HashJoinExec0() throws IOException, PlanningException {
-    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+    FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
@@ -270,9 +270,9 @@ public class TestRightOuterHashJoinExec {
 
   @Test
   public final void testRightOuter_HashJoinExec1() throws IOException, PlanningException {
-    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+    FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
@@ -311,9 +311,9 @@ public class TestRightOuterHashJoinExec {
     @Test
   public final void testRightOuter_HashJoinExec2() throws IOException, PlanningException {
     
-    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+    FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index c6bf2ef..b67dc09 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -107,7 +107,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender1 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
@@ -146,7 +146,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta dep4Meta = CatalogUtil.newTableMeta("CSV");
     Path dep4Path = new Path(testDir, "dep4.csv");
-    Appender appender4 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender4 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(dep4Meta, dep4Schema, dep4Path);
     appender4.init();
     Tuple tuple4 = new VTuple(dep4Schema.size());
@@ -178,7 +178,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender2 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
@@ -218,7 +218,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender3 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
@@ -271,7 +271,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender5 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
 
@@ -314,9 +314,9 @@ public class TestRightOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+    FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
 
@@ -352,9 +352,9 @@ public class TestRightOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] job3Frags =
-        FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
+        FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuterMergeJoin1");
@@ -388,9 +388,9 @@ public class TestRightOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] job3Frags =
-        FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
+        FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuterMergeJoin2");
@@ -424,9 +424,9 @@ public class TestRightOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] dep4Frags =
-        FileStorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE);
+        FileTablespace.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
 
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuter_MergeJoin3");
@@ -461,10 +461,10 @@ public class TestRightOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] phone3Frags =
-        FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
-        Integer.MAX_VALUE);
+        FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+            Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testRightOuter_MergeJoin4");
@@ -498,8 +498,8 @@ public class TestRightOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
-    FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+        FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index a350831..6e0aa8e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -56,7 +56,7 @@ public class TestSortExec {
   private static SQLAnalyzer analyzer;
   private static LogicalPlanner planner;
   private static LogicalOptimizer optimizer;
-  private static FileStorageManager sm;
+  private static FileTablespace sm;
   private static Path workDir;
   private static Path tablePath;
   private static TableMeta employeeMeta;
@@ -69,7 +69,7 @@ public class TestSortExec {
     util = TpchTestBase.getInstance().getTestingCluster();
     catalog = util.getMaster().getCatalog();
     workDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
+    sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", Type.INT4);
@@ -81,7 +81,7 @@ public class TestSortExec {
     tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
     sm.getFileSystem().mkdirs(tablePath.getParent());
 
-    Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+    Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
         .getAppender(employeeMeta, schema, tablePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
@@ -110,7 +110,7 @@ public class TestSortExec {
 
   @Test
   public final void testNext() throws IOException, PlanningException {
-    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE);
+    FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestSortExec");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
         LocalTajoTestingUtility

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index d2faf7e..b6a4707 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -37,7 +37,7 @@ import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.hbase.*;
@@ -205,7 +205,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
     assertTableExists("external_hbase_mapped_table");
 
-    HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
+    HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
         .getConnection(testingCluster.getHBaseUtil().getConf());
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
@@ -244,7 +244,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
     assertTableExists("external_hbase_mapped_table");
 
-    HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
+    HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
         .getConnection(testingCluster.getHBaseUtil().getConf());
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
@@ -297,7 +297,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
     assertTableExists("external_hbase_mapped_table");
 
-    HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
+    HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
         .getConnection(testingCluster.getHBaseUtil().getConf());
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
@@ -334,7 +334,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
     assertTableExists("external_hbase_mapped_table");
 
-    HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
+    HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
         .getConnection(testingCluster.getHBaseUtil().getConf());
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
@@ -469,8 +469,8 @@ public class TestHBaseTable extends QueryTestCaseBase {
     EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
         new ConstEval(new TextDatum("021")));
     scanNode.setQual(evalNodeEq);
-    StorageManager storageManager = TableSpaceManager.getStorageManager(conf, "HBASE");
-    List<Fragment> fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    Tablespace tablespace = TableSpaceManager.getStorageManager(conf, "HBASE");
+    List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
     assertEquals(1, fragments.size());
     assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
     assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
@@ -483,7 +483,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
     EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
     scanNode.setQual(evalNodeA);
 
-    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
     assertEquals(2, fragments.size());
     HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
     assertEquals("020", new String(fragment1.getStartRow()));
@@ -498,7 +498,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
         new ConstEval(new TextDatum("075")));
     EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
     scanNode.setQual(evalNodeB);
-    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
     assertEquals(3, fragments.size());
     fragment1 = (HBaseFragment) fragments.get(0);
     assertEquals("020", new String(fragment1.getStartRow()));
@@ -521,7 +521,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
     EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
     EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
     scanNode.setQual(evalNodeD);
-    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
     assertEquals(3, fragments.size());
 
     fragment1 = (HBaseFragment) fragments.get(0);
@@ -544,7 +544,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
     evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
     evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
     scanNode.setQual(evalNodeD);
-    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
+    fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
     assertEquals(2, fragments.size());
 
     fragment1 = (HBaseFragment) fragments.get(0);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 0e42412..a65c165 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -264,7 +264,7 @@ public class TestJoinQuery extends QueryTestCaseBase {
         }
         Path dataPath = new Path(table.getPath().toString(), fileIndex + ".csv");
         fileIndex++;
-        appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf))
+        appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
             .getAppender(tableMeta, schema, dataPath);
         appender.init();
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index 70d07c3..f6fd88f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -57,7 +57,7 @@ public class TestResultSet {
   private static TajoTestingCluster util;
   private static TajoConf conf;
   private static TableDesc desc;
-  private static FileStorageManager sm;
+  private static FileTablespace sm;
   private static TableMeta scoreMeta;
   private static Schema scoreSchema;
   private static List<ByteString> serializedData;
@@ -66,7 +66,7 @@ public class TestResultSet {
   public static void setup() throws Exception {
     util = TpchTestBase.getInstance().getTestingCluster();
     conf = util.getConfiguration();
-    sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
+    sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
 
     scoreSchema = new Schema();
     scoreSchema.addColumn("deptname", Type.TEXT);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 5efdede..0a473b5 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -33,7 +33,7 @@ import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.AfterClass;
@@ -82,7 +82,7 @@ public class TestExecutionBlockCursor {
     logicalPlanner = new LogicalPlanner(catalog);
     optimizer = new LogicalOptimizer(conf);
 
-    StorageManager sm  = TableSpaceManager.getFileStorageManager(conf);
+    Tablespace sm  = TableSpaceManager.getFileStorageManager(conf);
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 52b59ea..3ab2df6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -67,8 +67,8 @@ public class TestRowFile {
 
     TableMeta meta = CatalogUtil.newTableMeta("ROWFILE");
 
-    FileStorageManager sm =
-        (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
+    FileTablespace sm =
+        (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
 
     Path tablePath = new Path("/test");
     Path metaPath = new Path(tablePath, ".meta");

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
index d007aea..a8926a0 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -64,7 +64,7 @@ public class MergeScanner implements Scanner {
 
     long numBytes = 0;
     for (Fragment eachFileFragment: rawFragmentList) {
-      long fragmentLength = StorageManager.getFragmentLength((TajoConf)conf, eachFileFragment);
+      long fragmentLength = Tablespace.getFragmentLength((TajoConf) conf, eachFileFragment);
       if (fragmentLength > 0) {
         numBytes += fragmentLength;
         fragments.add(eachFileFragment);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
deleted file mode 100644
index 0751035..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.OverridableConf;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * StorageManager manages the functions of storing and reading data.
- * StorageManager is a abstract class.
- * For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class.
- *
- */
-public abstract class StorageManager implements TableSpace {
-
-  public static final PathFilter hiddenFileFilter = new PathFilter() {
-    public boolean accept(Path p) {
-      String name = p.getName();
-      return !name.startsWith("_") && !name.startsWith(".");
-    }
-  };
-
-  protected TajoConf conf;
-  protected String storeType;
-
-  public StorageManager(String storeType) {
-    this.storeType = storeType;
-  }
-
-  /**
-   * Initialize storage manager.
-   * @throws java.io.IOException
-   */
-  protected abstract void storageInit() throws IOException;
-
-  /**
-   * This method is called after executing "CREATE TABLE" statement.
-   * If a storage is a file based storage, a storage manager may create directory.
-   *
-   * @param tableDesc Table description which is created.
-   * @param ifNotExists Creates the table only when the table does not exist.
-   * @throws java.io.IOException
-   */
-  @Override
-  public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
-
-  /**
-   * This method is called after executing "DROP TABLE" statement with the 'PURGE' option
-   * which is the option to delete all the data.
-   *
-   * @param tableDesc
-   * @throws java.io.IOException
-   */
-  @Override
-  public abstract void purgeTable(TableDesc tableDesc) throws IOException;
-
-  /**
-   * Returns the splits that will serve as input for the scan tasks. The
-   * number of splits matches the number of regions in a table.
-   * @param fragmentId The table name or previous ExecutionBlockId
-   * @param tableDesc The table description for the target data.
-   * @param scanNode The logical node for scanning.
-   * @return The list of input fragments.
-   * @throws java.io.IOException
-   */
-  @Override
-  public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc,
-                                           ScanNode scanNode) throws IOException;
-
-  /**
-   * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'.
-   * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation.
-   * @param tableDesc The table description for the target data.
-   * @param currentPage The current page number within the entire list.
-   * @param numFragments The number of fragments in the result.
-   * @return The list of input fragments.
-   * @throws java.io.IOException
-   */
-  public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments)
-      throws IOException;
-
-  /**
-   * It returns the storage property.
-   * @return The storage property
-   */
-  public abstract StorageProperty getStorageProperty();
-
-  /**
-   * Release storage manager resource
-   */
-  @Override
-  public abstract void close();
-
-
-  /**
-   * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
-   * In general Repartitioner determines the partition range using previous output statistics data.
-   * In the special cases, such as HBase Repartitioner uses the result of this method.
-   *
-   * @param queryContext The current query context which contains query properties.
-   * @param tableDesc The table description for the target data.
-   * @param inputSchema The input schema
-   * @param sortSpecs The sort specification that contains the sort column and sort order.
-   * @return The list of sort ranges.
-   * @throws java.io.IOException
-   */
-  public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
-                                                   Schema inputSchema, SortSpec[] sortSpecs,
-                                                   TupleRange dataRange) throws IOException;
-
-  /**
-   * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
-   * In general Tajo creates the target table after finishing the final sub-query of CATS.
-   * But In the special cases, such as HBase INSERT or CAST query uses the target table information.
-   * That kind of the storage should implements the logic related to creating table in this method.
-   *
-   * @param node The child node of the root node.
-   * @throws java.io.IOException
-   */
-  public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException;
-
-  /**
-   * It is called when the query failed.
-   * Each storage manager should implement to be processed when the query fails in this method.
-   *
-   * @param node The child node of the root node.
-   * @throws java.io.IOException
-   */
-  public abstract void rollbackOutputCommit(LogicalNode node) throws IOException;
-
-  /**
-   * Returns the current storage type.
-   * @return
-   */
-  public String getStoreType() {
-    return storeType;
-  }
-
-  /**
-   * Initialize StorageManager instance. It should be called before using.
-   *
-   * @param tajoConf
-   * @throws java.io.IOException
-   */
-  public void init(TajoConf tajoConf) throws IOException {
-    this.conf = tajoConf;
-    storageInit();
-  }
-
-  /**
-   * Returns the splits that will serve as input for the scan tasks. The
-   * number of splits matches the number of regions in a table.
-   *
-   * @param fragmentId The table name or previous ExecutionBlockId
-   * @param tableDesc The table description for the target data.
-   * @return The list of input fragments.
-   * @throws java.io.IOException
-   */
-  public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException {
-    return getSplits(fragmentId, tableDesc, null);
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @param target Columns which are selected.
-   * @return Scanner instance
-   * @throws java.io.IOException
-   */
-  @Override
-  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
-    return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @return Scanner instance
-   * @throws java.io.IOException
-   */
-  @Override
-  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
-    return getScanner(meta, schema, fragment, schema);
-  }
-
-  /**
-   * Returns Scanner instance.
-   *
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @param target The output schema
-   * @return Scanner instance
-   * @throws java.io.IOException
-   */
-  @Override
-  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
-    if (fragment.isEmpty()) {
-      Scanner scanner = new NullScanner(conf, schema, meta, fragment);
-      scanner.setTarget(target.toArray());
-
-      return scanner;
-    }
-
-    Scanner scanner;
-
-    Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
-    scanner = TableSpaceManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
-    scanner.setTarget(target.toArray());
-
-    return scanner;
-  }
-
-  /**
-   * Returns Appender instance.
-   * @param queryContext Query property.
-   * @param taskAttemptId Task id.
-   * @param meta Table meta data.
-   * @param schema Output schema.
-   * @param workDir Working directory
-   * @return Appender instance
-   * @throws java.io.IOException
-   */
-  public Appender getAppender(OverridableConf queryContext,
-                              TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
-      throws IOException {
-    Appender appender;
-
-    Class<? extends Appender> appenderClass;
-
-    String handlerName = meta.getStoreType().toLowerCase();
-    appenderClass = TableSpaceManager.APPENDER_HANDLER_CACHE.get(handlerName);
-    if (appenderClass == null) {
-      appenderClass = conf.getClass(
-          String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
-      TableSpaceManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
-    }
-
-    if (appenderClass == null) {
-      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
-    }
-
-    appender = TableSpaceManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
-
-    return appender;
-  }
-
-  /**
-   * Return the Scanner class for the StoreType that is defined in storage-default.xml.
-   *
-   * @param storeType store type
-   * @return The Scanner class
-   * @throws java.io.IOException
-   */
-  public Class<? extends Scanner> getScannerClass(String storeType) throws IOException {
-    String handlerName = storeType.toLowerCase();
-    Class<? extends Scanner> scannerClass = TableSpaceManager.SCANNER_HANDLER_CACHE.get(handlerName);
-    if (scannerClass == null) {
-      scannerClass = conf.getClass(
-          String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class);
-      TableSpaceManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
-    }
-
-    if (scannerClass == null) {
-      throw new IOException("Unknown Storage Type: " + storeType);
-    }
-
-    return scannerClass;
-  }
-
-  /**
-   * Return length of the fragment.
-   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
-   *
-   * @param conf Tajo system property
-   * @param fragment Fragment
-   * @return
-   */
-  public static long getFragmentLength(TajoConf conf, Fragment fragment) {
-    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
-      return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
-    } else {
-      return fragment.getLength();
-    }
-  }
-
-  /**
-   * It is called after making logical plan. Storage manager should verify the schema for inserting.
-   *
-   * @param tableDesc The table description of insert target.
-   * @param outSchema  The output schema of select query for inserting.
-   * @throws java.io.IOException
-   */
-  @Override
-  public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException {
-    // nothing to do
-  }
-
-  /**
-   * Returns the list of storage specified rewrite rules.
-   * This values are used by LogicalOptimizer.
-   *
-   * @param queryContext The query property
-   * @param tableDesc The description of the target table.
-   * @return The list of storage specified rewrite rules
-   * @throws java.io.IOException
-   */
-  @Override
-  public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc)
-      throws IOException {
-    return null;
-  }
-
-  /**
-   * Finalizes result data. Tajo stores result data in the staging directory.
-   * If the query fails, clean up the staging directory.
-   * Otherwise the query is successful, move to the final directory from the staging directory.
-   *
-   * @param queryContext The query property
-   * @param finalEbId The final execution block id
-   * @param plan The query plan
-   * @param schema The final output schema
-   * @param tableDesc The description of the target table
-   * @return Saved path
-   * @throws java.io.IOException
-   */
-  @Override
-  public abstract Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
-                               LogicalPlan plan, Schema schema,
-                               TableDesc tableDesc) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
deleted file mode 100644
index ef4aa9a..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.OverridableConf;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * It manages each tablespace; e.g., HDFS, Local file system, and Amazon S3.
- */
-public interface TableSpace extends Closeable {
-  //public void format() throws IOException;
-
-  void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
-
-  void purgeTable(TableDesc tableDesc) throws IOException;
-
-  List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException;
-
-  List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException;
-
-//  public void renameTable() throws IOException;
-//
-//  public void truncateTable() throws IOException;
-//
-//  public long availableCapacity() throws IOException;
-//
-//  public long totalCapacity() throws IOException;
-
-  Scanner getScanner(TableMeta meta, Schema schema, CatalogProtos.FragmentProto fragment, Schema target) throws IOException;
-
-  Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException;
-
-  Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException;
-
-  Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
-                               LogicalPlan plan, Schema schema,
-                               TableDesc tableDesc) throws IOException;
-
-  void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException;
-
-  List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException;
-
-  void close() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
index 42a5e07..a787cdb 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
@@ -63,10 +63,10 @@ public class TableSpaceManager {
       Path.class
   };
   /**
-   * Cache of StorageManager.
+   * Cache of Tablespace.
    * Key is manager key(warehouse path) + store type
    */
-  private static final Map<String, StorageManager> storageManagers = Maps.newHashMap();
+  private static final Map<String, Tablespace> storageManagers = Maps.newHashMap();
   /**
    * Cache of constructors for each class. Pins the classes so they
    * can't be garbage collected until ReflectionUtils can be collected.
@@ -86,13 +86,13 @@ public class TableSpaceManager {
   }
 
   /**
-   * Close StorageManager
+   * Close Tablespace
    * @throws java.io.IOException
    */
   public static void shutdown() throws IOException {
     synchronized(storageManagers) {
-      for (StorageManager eachStorageManager: storageManagers.values()) {
-        eachStorageManager.close();
+      for (Tablespace eachTablespace : storageManagers.values()) {
+        eachTablespace.close();
       }
     }
     clearCache();
@@ -105,19 +105,19 @@ public class TableSpaceManager {
    * @return
    * @throws IOException
    */
-  public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException {
+  public static Tablespace getFileStorageManager(TajoConf tajoConf) throws IOException {
     return getStorageManager(tajoConf, "CSV");
   }
 
   /**
-   * Returns the proper StorageManager instance according to the storeType.
+   * Returns the proper Tablespace instance according to the storeType.
    *
    * @param tajoConf Tajo system property.
    * @param storeType Storage type
    * @return
    * @throws IOException
    */
-  public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
+  public static Tablespace getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
     FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
     if (fileSystem != null) {
       return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString());
@@ -127,7 +127,7 @@ public class TableSpaceManager {
   }
 
   /**
-   * Returns the proper StorageManager instance according to the storeType
+   * Returns the proper Tablespace instance according to the storeType
    *
    * @param tajoConf Tajo system property.
    * @param storeType Storage type
@@ -135,7 +135,7 @@ public class TableSpaceManager {
    * @return
    * @throws IOException
    */
-  private static synchronized StorageManager getStorageManager (
+  private static synchronized Tablespace getStorageManager (
       TajoConf tajoConf, String storeType, String managerKey) throws IOException {
 
     String typeName;
@@ -147,19 +147,19 @@ public class TableSpaceManager {
 
     synchronized (storageManagers) {
       String storeKey = typeName + "_" + managerKey;
-      StorageManager manager = storageManagers.get(storeKey);
+      Tablespace manager = storageManagers.get(storeKey);
 
       if (manager == null) {
-        Class<? extends StorageManager> storageManagerClass =
-            tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class);
+        Class<? extends Tablespace> storageManagerClass =
+            tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, Tablespace.class);
 
         if (storageManagerClass == null) {
           throw new IOException("Unknown Storage Type: " + typeName);
         }
 
         try {
-          Constructor<? extends StorageManager> constructor =
-              (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass);
+          Constructor<? extends Tablespace> constructor =
+              (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass);
           if (constructor == null) {
             constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class});
             constructor.setAccessible(true);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
new file mode 100644
index 0000000..0626da8
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Tablespace manages the functions of storing and reading data.
+ * Tablespace is a abstract class.
+ * For supporting such as HDFS, HBASE, a specific Tablespace should be implemented by inheriting this class.
+ *
+ */
+public abstract class Tablespace {
+
+  public static final PathFilter hiddenFileFilter = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  protected TajoConf conf;
+  protected String storeType;
+
+  public Tablespace(String storeType) {
+    this.storeType = storeType;
+  }
+
+  /**
+   * Initialize storage manager.
+   * @throws java.io.IOException
+   */
+  protected abstract void storageInit() throws IOException;
+
+  /**
+   * This method is called after executing "CREATE TABLE" statement.
+   * If a storage is a file based storage, a storage manager may create directory.
+   *
+   * @param tableDesc Table description which is created.
+   * @param ifNotExists Creates the table only when the table does not exist.
+   * @throws java.io.IOException
+   */
+  public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
+
+  /**
+   * This method is called after executing "DROP TABLE" statement with the 'PURGE' option
+   * which is the option to delete all the data.
+   *
+   * @param tableDesc
+   * @throws java.io.IOException
+   */
+  public abstract void purgeTable(TableDesc tableDesc) throws IOException;
+
+  /**
+   * Returns the splits that will serve as input for the scan tasks. The
+   * number of splits matches the number of regions in a table.
+   * @param fragmentId The table name or previous ExecutionBlockId
+   * @param tableDesc The table description for the target data.
+   * @param scanNode The logical node for scanning.
+   * @return The list of input fragments.
+   * @throws java.io.IOException
+   */
+  public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc,
+                                           ScanNode scanNode) throws IOException;
+
+  /**
+   * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'.
+   * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation.
+   * @param tableDesc The table description for the target data.
+   * @param currentPage The current page number within the entire list.
+   * @param numFragments The number of fragments in the result.
+   * @return The list of input fragments.
+   * @throws java.io.IOException
+   */
+  public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments)
+      throws IOException;
+
+  /**
+   * It returns the storage property.
+   * @return The storage property
+   */
+  public abstract StorageProperty getStorageProperty();
+
+  /**
+   * Release storage manager resource
+   */
+  public abstract void close();
+
+
+  /**
+   * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
+   * In general Repartitioner determines the partition range using previous output statistics data.
+   * In the special cases, such as HBase Repartitioner uses the result of this method.
+   *
+   * @param queryContext The current query context which contains query properties.
+   * @param tableDesc The table description for the target data.
+   * @param inputSchema The input schema
+   * @param sortSpecs The sort specification that contains the sort column and sort order.
+   * @return The list of sort ranges.
+   * @throws java.io.IOException
+   */
+  public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
+                                                   Schema inputSchema, SortSpec[] sortSpecs,
+                                                   TupleRange dataRange) throws IOException;
+
+  /**
+   * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
+   * In general Tajo creates the target table after finishing the final sub-query of CATS.
+   * But In the special cases, such as HBase INSERT or CAST query uses the target table information.
+   * That kind of the storage should implements the logic related to creating table in this method.
+   *
+   * @param node The child node of the root node.
+   * @throws java.io.IOException
+   */
+  public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException;
+
+  /**
+   * It is called when the query failed.
+   * Each storage manager should implement to be processed when the query fails in this method.
+   *
+   * @param node The child node of the root node.
+   * @throws java.io.IOException
+   */
+
+  /**
+   * Returns the current storage type.
+   * @return
+   */
+  public String getStoreType() {
+    return storeType;
+  }
+
+  /**
+   * Initialize Tablespace instance. It should be called before using.
+   *
+   * @param tajoConf
+   * @throws java.io.IOException
+   */
+  public void init(TajoConf tajoConf) throws IOException {
+    this.conf = tajoConf;
+    storageInit();
+  }
+
+  /**
+   * Returns the splits that will serve as input for the scan tasks. The
+   * number of splits matches the number of regions in a table.
+   *
+   * @param fragmentId The table name or previous ExecutionBlockId
+   * @param tableDesc The table description for the target data.
+   * @return The list of input fragments.
+   * @throws java.io.IOException
+   */
+  public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException {
+    return getSplits(fragmentId, tableDesc, null);
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target Columns which are selected.
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
+    return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
+    return getScanner(meta, schema, fragment, schema);
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+    if (fragment.isEmpty()) {
+      Scanner scanner = new NullScanner(conf, schema, meta, fragment);
+      scanner.setTarget(target.toArray());
+
+      return scanner;
+    }
+
+    Scanner scanner;
+
+    Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
+    scanner = TableSpaceManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
+    scanner.setTarget(target.toArray());
+
+    return scanner;
+  }
+
+  /**
+   * Returns Appender instance.
+   * @param queryContext Query property.
+   * @param taskAttemptId Task id.
+   * @param meta Table meta data.
+   * @param schema Output schema.
+   * @param workDir Working directory
+   * @return Appender instance
+   * @throws java.io.IOException
+   */
+  public Appender getAppender(OverridableConf queryContext,
+                              TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
+      throws IOException {
+    Appender appender;
+
+    Class<? extends Appender> appenderClass;
+
+    String handlerName = meta.getStoreType().toLowerCase();
+    appenderClass = TableSpaceManager.APPENDER_HANDLER_CACHE.get(handlerName);
+    if (appenderClass == null) {
+      appenderClass = conf.getClass(
+          String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
+      TableSpaceManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+    }
+
+    if (appenderClass == null) {
+      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+    }
+
+    appender = TableSpaceManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
+
+    return appender;
+  }
+
+  /**
+   * Return the Scanner class for the StoreType that is defined in storage-default.xml.
+   *
+   * @param storeType store type
+   * @return The Scanner class
+   * @throws java.io.IOException
+   */
+  public Class<? extends Scanner> getScannerClass(String storeType) throws IOException {
+    String handlerName = storeType.toLowerCase();
+    Class<? extends Scanner> scannerClass = TableSpaceManager.SCANNER_HANDLER_CACHE.get(handlerName);
+    if (scannerClass == null) {
+      scannerClass = conf.getClass(
+          String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class);
+      TableSpaceManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
+    }
+
+    if (scannerClass == null) {
+      throw new IOException("Unknown Storage Type: " + storeType);
+    }
+
+    return scannerClass;
+  }
+
+  /**
+   * Return length of the fragment.
+   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
+   *
+   * @param conf Tajo system property
+   * @param fragment Fragment
+   * @return
+   */
+  public static long getFragmentLength(TajoConf conf, Fragment fragment) {
+    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
+      return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
+    } else {
+      return fragment.getLength();
+    }
+  }
+
+  public abstract void rollbackOutputCommit(LogicalNode node) throws IOException;
+
+  /**
+   * It is called after making logical plan. Storage manager should verify the schema for inserting.
+   *
+   * @param tableDesc The table description of insert target.
+   * @param outSchema  The output schema of select query for inserting.
+   * @throws java.io.IOException
+   */
+  public abstract void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException;
+
+  /**
+   * Returns the list of storage specified rewrite rules.
+   * This values are used by LogicalOptimizer.
+   *
+   * @param queryContext The query property
+   * @param tableDesc The description of the target table.
+   * @return The list of storage specified rewrite rules
+   * @throws java.io.IOException
+   */
+  public abstract List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc)
+      throws IOException;
+
+  /**
+   * Finalizes result data. Tajo stores result data in the staging directory.
+   * If the query fails, clean up the staging directory.
+   * Otherwise the query is successful, move to the final directory from the staging directory.
+   *
+   * @param queryContext The query property
+   * @param finalEbId The final execution block id
+   * @param plan The query plan
+   * @param schema The final output schema
+   * @param tableDesc The description of the target table
+   * @return Saved path
+   * @throws java.io.IOException
+   */
+  public abstract Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
+                               LogicalPlan plan, Schema schema,
+                               TableDesc tableDesc) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
index aa078a7..93611fb 100644
--- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml
@@ -23,11 +23,11 @@
   <!-- Storage Manager Configuration -->
   <property>
     <name>tajo.storage.manager.hdfs.class</name>
-    <value>org.apache.tajo.storage.FileStorageManager</value>
+    <value>org.apache.tajo.storage.FileTablespace</value>
   </property>
   <property>
     <name>tajo.storage.manager.hbase.class</name>
-    <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+    <value>org.apache.tajo.storage.hbase.HBaseTablespace</value>
   </property>
 
   <property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
index 712f664..6aa32fc 100644
--- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
+++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml
@@ -28,11 +28,11 @@
   <!-- Storage Manager Configuration -->
   <property>
     <name>tajo.storage.manager.hdfs.class</name>
-    <value>org.apache.tajo.storage.FileStorageManager</value>
+    <value>org.apache.tajo.storage.FileTablespace</value>
   </property>
   <property>
     <name>tajo.storage.manager.hbase.class</name>
-    <value>org.apache.tajo.storage.hbase.HBaseStorageManager</value>
+    <value>org.apache.tajo.storage.hbase.HBaseTablespace</value>
   </property>
 
   <!--- Registered Scanner Handler -->

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 09a86b4..b1a2d59 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -46,8 +46,8 @@ public class HBasePutAppender extends AbstractHBaseAppender {
   public void init() throws IOException {
     super.init();
 
-    Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
-    HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager((TajoConf) conf, "HBASE"))
+    Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(conf, meta);
+    HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager((TajoConf) conf, "HBASE"))
         .getConnection(hbaseConf);
     htable = hconn.getTable(columnMapping.getHbaseTableName());
     htable.setAutoFlushTo(false);

http://git-wip-us.apache.org/repos/asf/tajo/blob/d261234f/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index 24bfd4d..992c13c 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -133,7 +133,7 @@ public class HBaseScanner implements Scanner {
     rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
     rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
 
-    hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
+    hbaseConf = HBaseTablespace.getHBaseConfiguration(conf, meta);
 
     initScanner();
   }
@@ -181,7 +181,7 @@ public class HBaseScanner implements Scanner {
     }
 
     if (htable == null) {
-      HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
+      HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
           .getConnection(hbaseConf);
       htable = hconn.getTable(fragment.getHbaseTableName());
     }