You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:51 UTC

[37/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
index c11db6f..925c047 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -30,12 +30,10 @@ import java.io.IOException;
 
 public class TajoQueryEngine {
 
-  private final StorageManager storageManager;
   private final PhysicalPlanner phyPlanner;
 
   public TajoQueryEngine(TajoConf conf) throws IOException {
-    this.storageManager = StorageManager.getStorageManager(conf);
-    this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager);
+    this.phyPlanner = new PhysicalPlannerImpl(conf);
   }
   
   public PhysicalExec createPlan(TaskAttemptContext ctx, LogicalNode plan)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index cb038df..00eabcc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
 import org.apache.tajo.catalog.Schema;
@@ -53,10 +52,7 @@ import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.BaseTupleComparator;
-import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
@@ -108,39 +104,6 @@ public class Task {
   private Schema finalSchema = null;
   private TupleComparator sortComp = null;
 
-  static final String OUTPUT_FILE_PREFIX="part-";
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(2);
-          return fmt;
-        }
-      };
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(6);
-          return fmt;
-        }
-      };
-
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(3);
-          return fmt;
-        }
-      };
-
   public Task(String taskRunnerId,
               Path baseDir,
               QueryUnitAttemptId taskId,
@@ -190,13 +153,8 @@ public class Task {
         this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
       }
     } else {
-      // The final result of a task will be written in a file named part-ss-nnnnnnn,
-      // where ss is the subquery id associated with this task, and nnnnnn is the task id.
-      Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME,
-          OUTPUT_FILE_PREFIX +
-          OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
-          OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()) + "-" +
-          OUTPUT_FILE_FORMAT_SEQ.get().format(0));
+      Path outFilePath = ((FileStorageManager)StorageManager.getFileStorageManager(systemConf))
+          .getAppenderFilePath(taskId, queryContext.getStagingDir());
       LOG.info("Output File Path: " + outFilePath);
       context.setOutputPath(outFilePath);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
index 18a67d8..49635d1 100644
--- a/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/queryunit.jsp
@@ -41,6 +41,7 @@
 <%@ page import="java.text.SimpleDateFormat" %>
 <%@ page import="java.util.Map" %>
 <%@ page import="java.util.Set" %>
+<%@ page import="org.apache.tajo.storage.fragment.Fragment" %>
 
 <%
     String paramQueryId = request.getParameter("queryId");
@@ -102,8 +103,8 @@
     String fragmentInfo = "";
     String delim = "";
     for (CatalogProtos.FragmentProto eachFragment : queryUnit.getAllFragments()) {
-        FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
-        fragmentInfo += delim + fileFragment.toString();
+        Fragment fragment = FragmentConvertor.convert(tajoWorker.getConfig(), eachFragment);
+        fragmentInfo += delim + fragment.toString();
         delim = "<br/>";
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
index 45d3c51..fb98be2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -47,7 +47,7 @@ public class BackendTestingUtil {
 
   public static void writeTmpTable(TajoConf conf, Path tablePath)
       throws IOException {
-    StorageManager sm = StorageManager.getStorageManager(conf, tablePath);
+    FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, tablePath);
     FileSystem fs = sm.getFileSystem();
 
     Appender appender;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java
new file mode 100644
index 0000000..a8e4a5c
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.tajo.util.Bytes;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
+
+public class HBaseTestClusterUtil {
+  private static final Log LOG = LogFactory.getLog(HBaseTestClusterUtil.class);
+  private Configuration conf;
+  private MiniHBaseCluster hbaseCluster;
+  private MiniZooKeeperCluster zkCluster;
+  private File testBaseDir;
+  public HBaseTestClusterUtil(Configuration conf, File testBaseDir) {
+    this.conf = conf;
+    this.testBaseDir = testBaseDir;
+  }
+  /**
+   * Returns the path to the default root dir the minicluster uses.
+   * Note: this does not cause the root dir to be created.
+   * @return Fully qualified path for the default hbase root dir
+   * @throws java.io.IOException
+   */
+  public Path getDefaultRootDirPath() throws IOException {
+    FileSystem fs = FileSystem.get(this.conf);
+    return new Path(fs.makeQualified(fs.getHomeDirectory()),"hbase");
+  }
+
+  /**
+   * Creates an hbase rootdir in user home directory.  Also creates hbase
+   * version file.  Normally you won't make use of this method.  Root hbasedir
+   * is created for you as part of mini cluster startup.  You'd only use this
+   * method if you were doing manual operation.
+   * @return Fully qualified path to hbase root dir
+   * @throws java.io.IOException
+   */
+  public Path createRootDir() throws IOException {
+    FileSystem fs = FileSystem.get(this.conf);
+    Path hbaseRootdir = getDefaultRootDirPath();
+    FSUtils.setRootDir(this.conf, hbaseRootdir);
+    fs.mkdirs(hbaseRootdir);
+    FSUtils.setVersion(fs, hbaseRootdir);
+    return hbaseRootdir;
+  }
+
+  public void stopHBaseCluster() throws IOException {
+    if (hbaseCluster != null) {
+      LOG.info("MiniHBaseCluster stopped");
+      hbaseCluster.shutdown();
+      hbaseCluster.waitUntilShutDown();
+      hbaseCluster = null;
+    }
+  }
+
+  public void startHBaseCluster() throws Exception {
+    if (zkCluster == null) {
+      startMiniZKCluster();
+    }
+    if (hbaseCluster != null) {
+      return;
+    }
+
+    System.setProperty("HBASE_ZNODE_FILE", testBaseDir + "/hbase_znode_file");
+    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) {
+      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, 1);
+    }
+    if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, -1) == -1) {
+      conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1);
+    }
+    conf.setBoolean(REPLICATION_ENABLE_KEY, false);
+    createRootDir();
+
+    Configuration c = HBaseConfiguration.create(this.conf);
+
+    hbaseCluster = new MiniHBaseCluster(c, 1);
+
+    // Don't leave here till we've done a successful scan of the hbase:meta
+    HTable t = new HTable(c, TableName.META_TABLE_NAME);
+    ResultScanner s = t.getScanner(new Scan());
+    while (s.next() != null) {
+      continue;
+    }
+    s.close();
+    t.close();
+    LOG.info("MiniHBaseCluster started");
+
+  }
+
+  /**
+   * Start a mini ZK cluster. If the property "test.hbase.zookeeper.property.clientPort" is set
+   *  the port mentionned is used as the default port for ZooKeeper.
+   */
+  public MiniZooKeeperCluster startMiniZKCluster()
+      throws Exception {
+    File zkDataPath = new File(testBaseDir, "zk");
+    if (this.zkCluster != null) {
+      throw new IOException("Cluster already running at " + zkDataPath);
+    }
+    this.zkCluster = new MiniZooKeeperCluster(conf);
+    final int defPort = this.conf.getInt("test.hbase.zookeeper.property.clientPort", 0);
+    if (defPort > 0){
+      // If there is a port in the config file, we use it.
+      this.zkCluster.setDefaultClientPort(defPort);
+    }
+    int clientPort =  this.zkCluster.startup(zkDataPath, 1);
+    this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, Integer.toString(clientPort));
+    LOG.info("MiniZooKeeperCluster started");
+
+    return this.zkCluster;
+  }
+
+  public void stopZooKeeperCluster() throws IOException {
+    if (zkCluster != null) {
+      LOG.info("MiniZooKeeperCluster stopped");
+      zkCluster.shutdown();
+      zkCluster = null;
+    }
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public MiniZooKeeperCluster getMiniZooKeeperCluster() {
+    return zkCluster;
+  }
+
+  public MiniHBaseCluster getMiniHBaseCluster() {
+    return hbaseCluster;
+  }
+
+  public HTableDescriptor getTableDescriptor(String tableName) throws IOException {
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    try {
+      return admin.getTableDescriptor(Bytes.toBytes(tableName));
+    } finally {
+      admin.close();
+    }
+  }
+
+  public void createTable(HTableDescriptor hTableDesc) throws IOException {
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    try {
+      admin.createTable(hTableDesc);
+    } finally {
+      admin.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index ecfb9f5..875e450 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -169,9 +169,9 @@ public class QueryTestCaseBase {
   private static String currentDatabase;
   private static Set<String> createdTableGlobalSet = new HashSet<String>();
   // queries and results directory corresponding to subclass class.
-  private Path currentQueryPath;
-  private Path currentResultPath;
-  private Path currentDatasetPath;
+  protected Path currentQueryPath;
+  protected Path currentResultPath;
+  protected Path currentDatasetPath;
 
   // for getting a method name
   @Rule public TestName name = new TestName();
@@ -303,7 +303,7 @@ public class QueryTestCaseBase {
     return executeFile(getMethodName() + ".sql");
   }
 
-  private String getMethodName() {
+  protected String getMethodName() {
     String methodName = name.getMethodName();
     // In the case of parameter execution name's pattern is methodName[0]
     if (methodName.endsWith("]")) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 757ba0f..64c27e0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -70,6 +70,7 @@ public class TajoTestingCluster {
   private FileSystem defaultFS;
   private MiniDFSCluster dfsCluster;
 	private MiniCatalogServer catalogServer;
+  private HBaseTestClusterUtil hbaseUtil;
 
   private TajoMaster tajoMaster;
   private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
@@ -284,6 +285,10 @@ public class TajoTestingCluster {
     return this.defaultFS;
   }
 
+  public HBaseTestClusterUtil getHBaseUtil() {
+    return hbaseUtil;
+  }
+
   ////////////////////////////////////////////////////////
   // Catalog Section
   ////////////////////////////////////////////////////////
@@ -507,6 +512,8 @@ public class TajoTestingCluster {
     startMiniDFSCluster(numDataNodes, this.clusterTestBuildDir, dataNodeHosts);
     this.dfsCluster.waitClusterUp();
 
+    hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir);
+
     if(!standbyWorkerMode) {
       startMiniYarnCluster();
     }
@@ -594,7 +601,6 @@ public class TajoTestingCluster {
     }
 
     if(this.dfsCluster != null) {
-
       try {
         FileSystem fs = this.dfsCluster.getFileSystem();
         if (fs != null) fs.close();
@@ -613,6 +619,10 @@ public class TajoTestingCluster {
       }
       this.clusterTestBuildDir = null;
     }
+
+    hbaseUtil.stopZooKeeperCluster();
+    hbaseUtil.stopHBaseCluster();
+
     LOG.info("Minicluster is down");
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 3437e3a..9ce7b5b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -40,10 +40,10 @@ import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
@@ -380,7 +380,7 @@ public class TestPlannerUtil {
     int index = 0;
 
     for (int i = startIndex; i < startIndex + expectedSize; i++, index++) {
-      FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), StoreType.CSV, fragments[index]);
+      FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), fragments[index]);
       assertEquals(expectedFiles.get(i), fragment.getPath());
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
index 0e7f3e6..3803c7a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
@@ -42,10 +42,7 @@ import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.After;
 import org.junit.Before;
@@ -140,8 +137,8 @@ public class TestBroadcastJoinPlan {
         contentsData += j;
       }
     }
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(tableMeta, schema,
-        dataPath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(tableMeta, schema, dataPath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
     int writtenSize = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index aef8064..6a6aafb 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -79,7 +79,7 @@ public class TestBNLJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", Type.INT4);
@@ -89,7 +89,8 @@ public class TestBNLJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(employeeMeta, schema, employeePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
     for (int i = 0; i < OUTER_TUPLE_NUM; i++) {
@@ -110,7 +111,8 @@ public class TestBNLJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
@@ -150,10 +152,10 @@ public class TestBNLJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
 
-    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+    FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
         new Path(employee.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+    FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
         new Path(people.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -162,7 +164,7 @@ public class TestBNLJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -183,9 +185,9 @@ public class TestBNLJoinExec {
     LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf),
         context).getRootBlock().getRoot();
 
-    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+    FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), 
         new Path(employee.getPath()), Integer.MAX_VALUE);
-    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+    FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), 
         new Path(people.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
@@ -201,7 +203,7 @@ public class TestBNLJoinExec {
     ctx.setEnforcer(enforcer);
 
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 60ae849..dc3c28d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -66,7 +66,7 @@ public class TestBSTIndexExec {
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
-  private StorageManager sm;
+  private FileStorageManager sm;
   private Schema idxSchema;
   private BaseTupleComparator comp;
   private BSTIndex.BSTIndexWriter writer;
@@ -91,7 +91,7 @@ public class TestBSTIndexExec {
     Path workDir = CommonTestingUtil.getTestDir();
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
-    sm = StorageManager.getStorageManager(conf, workDir);
+    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir);
 
     idxPath = new Path(workDir, "test.idx");
 
@@ -117,8 +117,7 @@ public class TestBSTIndexExec {
     fs = tablePath.getFileSystem(conf);
     fs.mkdirs(tablePath.getParent());
 
-    FileAppender appender = (FileAppender)StorageManager.getStorageManager(conf).getAppender(meta, schema,
-        tablePath);
+    FileAppender appender = (FileAppender)sm.getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
     for (int i = 0; i < 10000; i++) {
@@ -164,7 +163,7 @@ public class TestBSTIndexExec {
     this.rndKey = rnd.nextInt(250);
     final String QUERY = "select * from employee where managerId = " + rndKey;
     
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
         LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir);
@@ -172,7 +171,7 @@ public class TestBSTIndexExec {
     LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    TmpPlanner phyPlanner = new TmpPlanner(conf, sm);
+    TmpPlanner phyPlanner = new TmpPlanner(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     int tupleCount = this.randomValues.get(rndKey);
@@ -186,8 +185,8 @@ public class TestBSTIndexExec {
   }
 
   private class TmpPlanner extends PhysicalPlannerImpl {
-    public TmpPlanner(TajoConf conf, StorageManager sm) {
-      super(conf, sm);
+    public TmpPlanner(TajoConf conf) {
+      super(conf);
     }
 
     @Override
@@ -196,12 +195,11 @@ public class TestBSTIndexExec {
       Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()),
           "Error: There is no table matched to %s", scanNode.getTableName());
 
-      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), meta.getStoreType(),
-          ctx.getTables(scanNode.getTableName()));
+      List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), ctx.getTables(scanNode.getTableName()));
       
       Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)};
 
-      return new BSTIndexScanExec(ctx, sm, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum);
+      return new BSTIndexScanExec(ctx, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum);
 
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index bb40b28..c0bf6ce 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -59,7 +59,7 @@ public class TestExternalSortExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private StorageManager sm;
+  private FileStorageManager sm;
   private Path testDir;
 
   private final int numTuple = 100000;
@@ -76,7 +76,7 @@ public class TestExternalSortExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", Type.INT4);
@@ -85,7 +85,8 @@ public class TestExternalSortExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
     appender.init();
     Tuple tuple = new VTuple(schema.size());
@@ -121,7 +122,7 @@ public class TestExternalSortExec {
 
   @Test
   public final void testNext() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -131,7 +132,7 @@ public class TestExternalSortExec {
     LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     
     ProjectionExec proj = (ProjectionExec) exec;
@@ -141,8 +142,7 @@ public class TestExternalSortExec {
       UnaryPhysicalExec sortExec = proj.getChild();
       SeqScanExec scan = sortExec.getChild();
 
-      ExternalSortExec extSort = new ExternalSortExec(ctx, sm,
-          ((MemSortExec)sortExec).getPlan(), scan);
+      ExternalSortExec extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), scan);
       proj.setChild(extSort);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index c4ce43b..ecd1c23 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -61,7 +61,7 @@ public class TestFullOuterHashJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private StorageManager sm;
+  private FileStorageManager sm;
   private Path testDir;
   private QueryContext defaultContext;
 
@@ -84,7 +84,7 @@ public class TestFullOuterHashJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -107,7 +107,8 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -136,7 +137,8 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -175,7 +177,8 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -227,8 +230,8 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
-        phone3Path);
+    Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
 
     appender5.flush();
@@ -266,9 +269,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+    FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
 
@@ -277,7 +280,7 @@ public class TestFullOuterHashJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -305,9 +308,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
@@ -316,7 +319,7 @@ public class TestFullOuterHashJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -343,9 +346,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
 
@@ -354,7 +357,7 @@ public class TestFullOuterHashJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -382,9 +385,9 @@ public class TestFullOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+    FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
 
@@ -394,7 +397,7 @@ public class TestFullOuterHashJoinExec {
         workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index a82de92..a81979f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -40,10 +40,7 @@ import org.apache.tajo.plan.logical.JoinNode;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -92,7 +89,7 @@ public class TestFullOuterMergeJoinExec {
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
 
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -115,7 +112,8 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -153,7 +151,8 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep4Path = new Path(testDir, "dep4.csv");
-    Appender appender4 = StorageManager.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path);
+    Appender appender4 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(dep4Meta, dep4Schema, dep4Path);
     appender4.init();
     Tuple tuple4 = new VTuple(dep4Schema.size());
     for (int i = 0; i < 11; i++) {
@@ -184,7 +183,8 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -223,7 +223,8 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -275,8 +276,8 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
-        phone3Path);
+    Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
     appender5.flush();
     appender5.close();
@@ -318,9 +319,9 @@ public class TestFullOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] dep3Frags =
-        StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin0");
@@ -328,7 +329,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -355,9 +356,9 @@ public class TestFullOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] job3Frags =
-        StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin1");
@@ -365,7 +366,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -392,9 +393,9 @@ public class TestFullOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] job3Frags =
-        StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin2");
@@ -402,7 +403,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -430,9 +431,9 @@ public class TestFullOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] dep4Frags =
-        StorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin3");
@@ -440,7 +441,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -470,9 +471,9 @@ public class TestFullOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] phone3Frags =
-        StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+        FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
 
@@ -481,7 +482,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -509,9 +510,9 @@ public class TestFullOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] phone3Frags =
-        StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+        FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
 
@@ -520,7 +521,7 @@ public class TestFullOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -537,9 +538,4 @@ public class TestFullOuterMergeJoinExec {
     exec.close();
     assertEquals(7, count);
   }
-
-
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 20d4651..4fe6ff2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -76,7 +76,7 @@ public class TestHashAntiJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerid", Type.INT4);
@@ -86,8 +86,8 @@ public class TestHashAntiJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
-        employeePath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(employeeMeta, employeeSchema, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeSchema.size());
 
@@ -112,7 +112,8 @@ public class TestHashAntiJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < 10; i += 2) {
@@ -150,9 +151,9 @@ public class TestHashAntiJoinExec {
 
   @Test
   public final void testHashAntiJoin() throws IOException, PlanningException {
-    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+    FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
-    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+    FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
         new Path(people.getPath()), Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -166,7 +167,7 @@ public class TestHashAntiJoinExec {
     optimizer.optimize(plan);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     // replace an equal join with an hash anti join.

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index d1fa28a..55e87d4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -78,7 +78,7 @@ public class TestHashJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerid", Type.INT4);
@@ -88,8 +88,8 @@ public class TestHashJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
-        employeePath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(employeeMeta, employeeSchema, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeSchema.size());
     for (int i = 0; i < 10; i++) {
@@ -111,7 +111,8 @@ public class TestHashJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < 10; i += 2) {
@@ -152,9 +153,9 @@ public class TestHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+    FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
-    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+    FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
         new Path(people.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
@@ -163,7 +164,7 @@ public class TestHashJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -195,9 +196,9 @@ public class TestHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+    FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
         new Path(people.getPath()), Integer.MAX_VALUE);
-    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+    FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
@@ -207,7 +208,7 @@ public class TestHashJoinExec {
     ctx.setEnforcer(enforcer);
 
     ctx.getQueryContext().setLong(SessionVars.HASH_JOIN_SIZE_LIMIT.keyname(), 100l);
-    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlannerImpl phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 7a43a55..a2f1155 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -38,10 +38,7 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -80,7 +77,7 @@ public class TestHashSemiJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerid", Type.INT4);
@@ -90,8 +87,8 @@ public class TestHashSemiJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
-        employeePath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(employeeMeta, employeeSchema, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeSchema.size());
 
@@ -116,7 +113,8 @@ public class TestHashSemiJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     // make 27 tuples
@@ -158,9 +156,9 @@ public class TestHashSemiJoinExec {
 
   @Test
   public final void testHashSemiJoin() throws IOException, PlanningException {
-    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+    FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
-    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+    FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
         new Path(people.getPath()), Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -174,7 +172,7 @@ public class TestHashSemiJoinExec {
     optimizer.optimize(plan);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     // replace an equal join with an hash anti join.

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index ec9daa7..0477771 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -85,7 +85,7 @@ public class TestLeftOuterHashJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -108,7 +108,8 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -137,7 +138,8 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -176,7 +178,8 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -228,8 +231,8 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
-        phone3Path);
+    Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
     
     appender5.flush();
@@ -270,9 +273,9 @@ public class TestLeftOuterHashJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
 
-    FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(),
+    FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(),
         new Path(dep3.getPath()), Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
         new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
 
@@ -281,7 +284,7 @@ public class TestLeftOuterHashJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -300,9 +303,9 @@ public class TestLeftOuterHashJoinExec {
 
   @Test
   public final void testLeftOuter_HashJoinExec1() throws IOException, PlanningException {
-    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(),
+    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(),
         new Path(job3.getPath()), Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
         new Path(emp3.getPath()), Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
@@ -314,7 +317,7 @@ public class TestLeftOuterHashJoinExec {
     Expr expr = analyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -341,9 +344,9 @@ public class TestLeftOuterHashJoinExec {
     @Test
   public final void testLeftOuter_HashJoinExec2() throws IOException, PlanningException {
     
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
         new Path(emp3.getPath()), Integer.MAX_VALUE);
-    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(),
+    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(),
         new Path(job3.getPath()), Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
@@ -355,7 +358,7 @@ public class TestLeftOuterHashJoinExec {
     Expr expr = analyzer.parse(QUERIES[2]);
     LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -383,9 +386,9 @@ public class TestLeftOuterHashJoinExec {
    @Test
   public final void testLeftOuter_HashJoinExec3() throws IOException, PlanningException {
     
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(),
         new Path(emp3.getPath()), Integer.MAX_VALUE);
-    FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(),
+    FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(),
         new Path(phone3.getPath()), Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
@@ -397,7 +400,7 @@ public class TestLeftOuterHashJoinExec {
     Expr expr = analyzer.parse(QUERIES[3]);
     LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -425,9 +428,9 @@ public class TestLeftOuterHashJoinExec {
    @Test
   public final void testLeftOuter_HashJoinExec4() throws IOException, PlanningException {
     
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, "default.emp3", emp3.getMeta(),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, "default.emp3", emp3.getMeta(),
         new Path(emp3.getPath()), Integer.MAX_VALUE);
-    FileFragment[] phone3Frags = StorageManager.splitNG(conf, "default.phone3", phone3.getMeta(),
+    FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, "default.phone3", phone3.getMeta(),
         new Path(phone3.getPath()), Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
@@ -439,7 +442,7 @@ public class TestLeftOuterHashJoinExec {
     Expr expr = analyzer.parse(QUERIES[4]);
     LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index b3d1f33..36dd77e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -81,7 +81,7 @@ public class TestLeftOuterNLJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -104,7 +104,8 @@ public class TestLeftOuterNLJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -133,7 +134,8 @@ public class TestLeftOuterNLJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -172,7 +174,8 @@ public class TestLeftOuterNLJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -224,8 +227,8 @@ public class TestLeftOuterNLJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
-        phone3Path);
+    Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
     
     appender5.flush();
@@ -254,9 +257,9 @@ public class TestLeftOuterNLJoinExec {
 
   @Test
   public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException {
-    FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+    FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
@@ -269,7 +272,7 @@ public class TestLeftOuterNLJoinExec {
     LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
 
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
@@ -295,9 +298,9 @@ public class TestLeftOuterNLJoinExec {
 
   @Test
   public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException {
-    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
@@ -311,7 +314,7 @@ public class TestLeftOuterNLJoinExec {
     LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
 
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     
     //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
@@ -340,9 +343,9 @@ public class TestLeftOuterNLJoinExec {
 
   @Test
   public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException {
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
@@ -355,7 +358,7 @@ public class TestLeftOuterNLJoinExec {
     LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
 
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     
     //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
@@ -385,9 +388,9 @@ public class TestLeftOuterNLJoinExec {
 
   @Test
   public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException {
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+    FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
@@ -400,7 +403,7 @@ public class TestLeftOuterNLJoinExec {
     LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
 
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     
     //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
@@ -429,9 +432,9 @@ public class TestLeftOuterNLJoinExec {
 
     @Test
   public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException {
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+    FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags);
@@ -444,7 +447,7 @@ public class TestLeftOuterNLJoinExec {
     LogicalNode plan = planner.createPlan(defaultContext, context).getRootBlock().getRoot();
 
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     
     //maybe plan results with hash join exec algorithm usage. Must convert from HashLeftOuterJoinExec into NLLeftOuterJoinExec
@@ -470,7 +473,4 @@ public class TestLeftOuterNLJoinExec {
     exec.close();
     assertEquals(0, count);
   }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index cae5de5..10d4d33 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -79,7 +79,7 @@ public class TestMergeJoinExec {
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
     FileSystem fs = testDir.getFileSystem(conf);
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerid", Type.INT4);
@@ -89,8 +89,8 @@ public class TestMergeJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
-        employeePath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(employeeMeta, employeeSchema, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeSchema.size());
     for (int i = 0; i < 10; i++) {
@@ -118,7 +118,8 @@ public class TestMergeJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < 10; i += 2) {
@@ -165,9 +166,9 @@ public class TestMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    FileFragment[] empFrags = sm.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()),
+    FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(), new Path(employee.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] peopleFrags = sm.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()),
+    FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(), new Path(people.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
 
@@ -176,7 +177,7 @@ public class TestMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, root);
     ProjectionExec proj = (ProjectionExec) exec;
     assertTrue(proj.getChild() instanceof MergeJoinExec);