You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/16 14:15:39 UTC

[08/10] tajo git commit: TAJO-1603: Refactor StorageManager. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
index 24d2dfa..0751035 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -18,33 +18,28 @@
 
 package org.apache.tajo.storage;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.text.NumberFormat;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * StorageManager manages the functions of storing and reading data.
@@ -52,23 +47,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class.
  *
  */
-public abstract class StorageManager {
-  private final Log LOG = LogFactory.getLog(StorageManager.class);
-
-  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
-      Configuration.class,
-      Schema.class,
-      TableMeta.class,
-      Fragment.class
-  };
-
-  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
-      Configuration.class,
-      TaskAttemptId.class,
-      Schema.class,
-      TableMeta.class,
-      Path.class
-  };
+public abstract class StorageManager implements TableSpace {
 
   public static final PathFilter hiddenFileFilter = new PathFilter() {
     public boolean accept(Path p) {
@@ -80,31 +59,6 @@ public abstract class StorageManager {
   protected TajoConf conf;
   protected String storeType;
 
-  /**
-   * Cache of StorageManager.
-   * Key is manager key(warehouse path) + store type
-   */
-  private static final Map<String, StorageManager> storageManagers = Maps.newHashMap();
-
-  /**
-   * Cache of scanner handlers for each storage type.
-   */
-  protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends Scanner>>();
-
-  /**
-   * Cache of appender handlers for each storage type.
-   */
-  protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends Appender>>();
-
-  /**
-   * Cache of constructors for each class. Pins the classes so they
-   * can't be garbage collected until ReflectionUtils can be collected.
-   */
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
-      new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
   public StorageManager(String storeType) {
     this.storeType = storeType;
   }
@@ -123,6 +77,7 @@ public abstract class StorageManager {
    * @param ifNotExists Creates the table only when the table does not exist.
    * @throws java.io.IOException
    */
+  @Override
   public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
 
   /**
@@ -132,6 +87,7 @@ public abstract class StorageManager {
    * @param tableDesc
    * @throws java.io.IOException
    */
+  @Override
   public abstract void purgeTable(TableDesc tableDesc) throws IOException;
 
   /**
@@ -143,6 +99,7 @@ public abstract class StorageManager {
    * @return The list of input fragments.
    * @throws java.io.IOException
    */
+  @Override
   public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc,
                                            ScanNode scanNode) throws IOException;
 
@@ -167,21 +124,11 @@ public abstract class StorageManager {
   /**
    * Release storage manager resource
    */
-  public abstract void closeStorageManager();
+  @Override
+  public abstract void close();
 
 
   /**
-   * Clear all class cache
-   */
-  @VisibleForTesting
-  protected synchronized static void clearCache() {
-    CONSTRUCTOR_CACHE.clear();
-    SCANNER_HANDLER_CACHE.clear();
-    APPENDER_HANDLER_CACHE.clear();
-    storageManagers.clear();
-  }
-
-  /**
    * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is USING_STORAGE_MANAGER.
    * In general Repartitioner determines the partition range using previous output statistics data.
    * In the special cases, such as HBase Repartitioner uses the result of this method.
@@ -237,19 +184,6 @@ public abstract class StorageManager {
   }
 
   /**
-   * Close StorageManager
-   * @throws java.io.IOException
-   */
-  public static void close() throws IOException {
-    synchronized(storageManagers) {
-      for (StorageManager eachStorageManager: storageManagers.values()) {
-        eachStorageManager.closeStorageManager();
-      }
-    }
-    clearCache();
-  }
-
-  /**
    * Returns the splits that will serve as input for the scan tasks. The
    * number of splits matches the number of regions in a table.
    *
@@ -263,85 +197,6 @@ public abstract class StorageManager {
   }
 
   /**
-   * Returns FileStorageManager instance.
-   *
-   * @param tajoConf Tajo system property.
-   * @return
-   * @throws java.io.IOException
-   */
-  public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException {
-    return getStorageManager(tajoConf, "CSV");
-  }
-
-  /**
-   * Returns the proper StorageManager instance according to the storeType.
-   *
-   * @param tajoConf Tajo system property.
-   * @param storeType Storage type
-   * @return
-   * @throws java.io.IOException
-   */
-  public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
-    FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
-    if (fileSystem != null) {
-      return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString());
-    } else {
-      return getStorageManager(tajoConf, storeType, null);
-    }
-  }
-
-  /**
-   * Returns the proper StorageManager instance according to the storeType
-   *
-   * @param tajoConf Tajo system property.
-   * @param storeType Storage type
-   * @param managerKey Key that can identify each storage manager(may be a path)
-   * @return
-   * @throws java.io.IOException
-   */
-  private static synchronized StorageManager getStorageManager (
-      TajoConf tajoConf, String storeType, String managerKey) throws IOException {
-
-    String typeName;
-    if (storeType.equalsIgnoreCase("HBASE")) {
-      typeName = "hbase";
-    } else {
-      typeName = "hdfs";
-    }
-
-    synchronized (storageManagers) {
-      String storeKey = typeName + "_" + managerKey;
-      StorageManager manager = storageManagers.get(storeKey);
-
-      if (manager == null) {
-        Class<? extends StorageManager> storageManagerClass =
-            tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class);
-
-        if (storageManagerClass == null) {
-          throw new IOException("Unknown Storage Type: " + typeName);
-        }
-
-        try {
-          Constructor<? extends StorageManager> constructor =
-              (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass);
-          if (constructor == null) {
-            constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class});
-            constructor.setAccessible(true);
-            CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
-          }
-          manager = constructor.newInstance(new Object[]{storeType});
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-        manager.init(tajoConf);
-        storageManagers.put(storeKey, manager);
-      }
-
-      return manager;
-    }
-  }
-
-  /**
    * Returns Scanner instance.
    *
    * @param meta The table meta
@@ -351,6 +206,7 @@ public abstract class StorageManager {
    * @return Scanner instance
    * @throws java.io.IOException
    */
+  @Override
   public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
     return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target);
   }
@@ -364,6 +220,7 @@ public abstract class StorageManager {
    * @return Scanner instance
    * @throws java.io.IOException
    */
+  @Override
   public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException {
     return getScanner(meta, schema, fragment, schema);
   }
@@ -378,6 +235,7 @@ public abstract class StorageManager {
    * @return Scanner instance
    * @throws java.io.IOException
    */
+  @Override
   public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
     if (fragment.isEmpty()) {
       Scanner scanner = new NullScanner(conf, schema, meta, fragment);
@@ -389,29 +247,13 @@ public abstract class StorageManager {
     Scanner scanner;
 
     Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
-    scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
+    scanner = TableSpaceManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
     scanner.setTarget(target.toArray());
 
     return scanner;
   }
 
   /**
-   * Returns Scanner instance.
-   *
-   * @param conf The system property
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @param target The output schema
-   * @return Scanner instance
-   * @throws java.io.IOException
-   */
-  public static synchronized SeekableScanner getSeekableScanner(
-      TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
-    return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
-  }
-
-  /**
    * Returns Appender instance.
    * @param queryContext Query property.
    * @param taskAttemptId Task id.
@@ -429,82 +271,23 @@ public abstract class StorageManager {
     Class<? extends Appender> appenderClass;
 
     String handlerName = meta.getStoreType().toLowerCase();
-    appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
+    appenderClass = TableSpaceManager.APPENDER_HANDLER_CACHE.get(handlerName);
     if (appenderClass == null) {
       appenderClass = conf.getClass(
           String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
-      APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+      TableSpaceManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
     }
 
     if (appenderClass == null) {
       throw new IOException("Unknown Storage Type: " + meta.getStoreType());
     }
 
-    appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
+    appender = TableSpaceManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
 
     return appender;
   }
 
   /**
-   * Creates a scanner instance.
-   *
-   * @param theClass Concrete class of scanner
-   * @param conf System property
-   * @param schema Input schema
-   * @param meta Table meta data
-   * @param fragment The fragment for scanning
-   * @param <T>
-   * @return The scanner instance
-   */
-  public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
-                                         Fragment fragment) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
-      }
-      result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return result;
-  }
-
-  /**
-   * Creates a scanner instance.
-   *
-   * @param theClass Concrete class of scanner
-   * @param conf System property
-   * @param taskAttemptId Task id
-   * @param meta Table meta data
-   * @param schema Input schema
-   * @param workDir Working directory
-   * @param <T>
-   * @return The scanner instance
-   */
-  public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
-                                          TableMeta meta, Schema schema, Path workDir) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
-      }
-      result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return result;
-  }
-
-  /**
    * Return the Scanner class for the StoreType that is defined in storage-default.xml.
    *
    * @param storeType store type
@@ -513,11 +296,11 @@ public abstract class StorageManager {
    */
   public Class<? extends Scanner> getScannerClass(String storeType) throws IOException {
     String handlerName = storeType.toLowerCase();
-    Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName);
+    Class<? extends Scanner> scannerClass = TableSpaceManager.SCANNER_HANDLER_CACHE.get(handlerName);
     if (scannerClass == null) {
       scannerClass = conf.getClass(
           String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class);
-      SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
+      TableSpaceManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
     }
 
     if (scannerClass == null) {
@@ -550,6 +333,7 @@ public abstract class StorageManager {
    * @param outSchema  The output schema of select query for inserting.
    * @throws java.io.IOException
    */
+  @Override
   public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException {
     // nothing to do
   }
@@ -563,7 +347,9 @@ public abstract class StorageManager {
    * @return The list of storage specified rewrite rules
    * @throws java.io.IOException
    */
-  public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException {
+  @Override
+  public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc)
+      throws IOException {
     return null;
   }
 
@@ -580,375 +366,8 @@ public abstract class StorageManager {
    * @return Saved path
    * @throws java.io.IOException
    */
-  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
+  @Override
+  public abstract Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
                                LogicalPlan plan, Schema schema,
-                               TableDesc tableDesc) throws IOException {
-    return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, true);
-  }
-
-  /**
-   * Finalizes result data. Tajo stores result data in the staging directory.
-   * If the query fails, clean up the staging directory.
-   * Otherwise the query is successful, move to the final directory from the staging directory.
-   *
-   * @param queryContext The query property
-   * @param finalEbId The final execution block id
-   * @param plan The query plan
-   * @param schema The final output schema
-   * @param tableDesc The description of the target table
-   * @param changeFileSeq If true change result file name with max sequence.
-   * @return Saved path
-   * @throws java.io.IOException
-   */
-  protected Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
-                               LogicalPlan plan, Schema schema,
-                               TableDesc tableDesc, boolean changeFileSeq) throws IOException {
-    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
-    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-    Path finalOutputDir;
-    if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
-      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
-      try {
-        FileSystem fs = stagingResultDir.getFileSystem(conf);
-
-        if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO
-
-          // It moves the original table into the temporary location.
-          // Then it moves the new result table into the original table location.
-          // Upon failed, it recovers the original table if possible.
-          boolean movedToOldTable = false;
-          boolean committed = false;
-          Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
-          ContentSummary summary = fs.getContentSummary(stagingResultDir);
-
-          if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && summary.getFileCount() > 0L) {
-            // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
-            // renaming directory.
-            Map<Path, Path> renameDirs = TUtil.newHashMap();
-            // This is a map for recovering existing partition directory. A key is current directory and a value is
-            // temporary directory to back up.
-            Map<Path, Path> recoveryDirs = TUtil.newHashMap();
-
-            try {
-              if (!fs.exists(finalOutputDir)) {
-                fs.mkdirs(finalOutputDir);
-              }
-
-              visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
-                  renameDirs, oldTableDir);
-
-              // Rename target partition directories
-              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
-                // Backup existing data files for recovering
-                if (fs.exists(entry.getValue())) {
-                  String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
-                      oldTableDir.toString());
-                  Path recoveryPath = new Path(recoveryPathString);
-                  fs.rename(entry.getValue(), recoveryPath);
-                  fs.exists(recoveryPath);
-                  recoveryDirs.put(entry.getValue(), recoveryPath);
-                }
-                // Delete existing directory
-                fs.delete(entry.getValue(), true);
-                // Rename staging directory to final output directory
-                fs.rename(entry.getKey(), entry.getValue());
-              }
-
-            } catch (IOException ioe) {
-              // Remove created dirs
-              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
-                fs.delete(entry.getValue(), true);
-              }
-
-              // Recovery renamed dirs
-              for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
-                fs.delete(entry.getValue(), true);
-                fs.rename(entry.getValue(), entry.getKey());
-              }
-
-              throw new IOException(ioe.getMessage());
-            }
-          } else { // no partition
-            try {
-
-              // if the final output dir exists, move all contents to the temporary table dir.
-              // Otherwise, just make the final output dir. As a result, the final output dir will be empty.
-              if (fs.exists(finalOutputDir)) {
-                fs.mkdirs(oldTableDir);
-
-                for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
-                  fs.rename(status.getPath(), oldTableDir);
-                }
-
-                movedToOldTable = fs.exists(oldTableDir);
-              } else { // if the parent does not exist, make its parent directory.
-                fs.mkdirs(finalOutputDir);
-              }
-
-              // Move the results to the final output dir.
-              for (FileStatus status : fs.listStatus(stagingResultDir)) {
-                fs.rename(status.getPath(), finalOutputDir);
-              }
-
-              // Check the final output dir
-              committed = fs.exists(finalOutputDir);
-
-            } catch (IOException ioe) {
-              // recover the old table
-              if (movedToOldTable && !committed) {
-
-                // if commit is failed, recover the old data
-                for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
-                  fs.delete(status.getPath(), true);
-                }
-
-                for (FileStatus status : fs.listStatus(oldTableDir)) {
-                  fs.rename(status.getPath(), finalOutputDir);
-                }
-              }
-
-              throw new IOException(ioe.getMessage());
-            }
-          }
-        } else {
-          String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
-
-          if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table
-
-            NumberFormat fmt = NumberFormat.getInstance();
-            fmt.setGroupingUsed(false);
-            fmt.setMinimumIntegerDigits(3);
-
-            if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
-              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
-                if (eachFile.isFile()) {
-                  LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
-                  continue;
-                }
-                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq);
-              }
-            } else {
-              int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
-              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
-                if (eachFile.getPath().getName().startsWith("_")) {
-                  continue;
-                }
-                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq);
-              }
-            }
-            // checking all file moved and remove empty dir
-            verifyAllFileMoved(fs, stagingResultDir);
-            FileStatus[] files = fs.listStatus(stagingResultDir);
-            if (files != null && files.length != 0) {
-              for (FileStatus eachFile: files) {
-                LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
-              }
-            }
-          } else { // CREATE TABLE AS SELECT (CTAS)
-            if (fs.exists(finalOutputDir)) {
-              for (FileStatus status : fs.listStatus(stagingResultDir)) {
-                fs.rename(status.getPath(), finalOutputDir);
-              }
-            } else {
-              fs.rename(stagingResultDir, finalOutputDir);
-            }
-            LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
-          }
-        }
-
-        // remove the staging directory if the final output dir is given.
-        Path stagingDirRoot = stagingDir.getParent();
-        fs.delete(stagingDirRoot, true);
-      } catch (Throwable t) {
-        LOG.error(t);
-        throw new IOException(t);
-      }
-    } else {
-      finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-    }
-
-    return finalOutputDir;
-  }
-
-  /**
-   * Attach the sequence number to the output file name and than move the file into the final result path.
-   *
-   * @param fs FileSystem
-   * @param stagingResultDir The staging result dir
-   * @param fileStatus The file status
-   * @param finalOutputPath Final output path
-   * @param nf Number format
-   * @param fileSeq The sequence number
-   * @throws java.io.IOException
-   */
-  private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
-                                          FileStatus fileStatus, Path finalOutputPath,
-                                          NumberFormat nf,
-                                          int fileSeq, boolean changeFileSeq) throws IOException {
-    if (fileStatus.isDirectory()) {
-      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
-      if (subPath != null) {
-        Path finalSubPath = new Path(finalOutputPath, subPath);
-        if (!fs.exists(finalSubPath)) {
-          fs.mkdirs(finalSubPath);
-        }
-        int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
-        for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
-          if (eachFile.getPath().getName().startsWith("_")) {
-            continue;
-          }
-          moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq);
-        }
-      } else {
-        throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
-      }
-    } else {
-      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
-      if (subPath != null) {
-        Path finalSubPath = new Path(finalOutputPath, subPath);
-        if (changeFileSeq) {
-          finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
-        }
-        if (!fs.exists(finalSubPath.getParent())) {
-          fs.mkdirs(finalSubPath.getParent());
-        }
-        if (fs.exists(finalSubPath)) {
-          throw new IOException("Already exists data file:" + finalSubPath);
-        }
-        boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
-        if (success) {
-          LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
-              "to final output[" + finalSubPath + "]");
-        } else {
-          LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
-              "to final output[" + finalSubPath + "]");
-        }
-      }
-    }
-  }
-
-  /**
-   * Removes the path of the parent.
-   * @param parentPath
-   * @param childPath
-   * @return
-   */
-  private String extractSubPath(Path parentPath, Path childPath) {
-    String parentPathStr = parentPath.toUri().getPath();
-    String childPathStr = childPath.toUri().getPath();
-
-    if (parentPathStr.length() > childPathStr.length()) {
-      return null;
-    }
-
-    int index = childPathStr.indexOf(parentPathStr);
-    if (index != 0) {
-      return null;
-    }
-
-    return childPathStr.substring(parentPathStr.length() + 1);
-  }
-
-  /**
-   * Attach the sequence number to a path.
-   *
-   * @param path Path
-   * @param seq sequence number
-   * @param nf Number format
-   * @return New path attached with sequence number
-   * @throws java.io.IOException
-   */
-  private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
-    String[] tokens = path.getName().split("-");
-    if (tokens.length != 4) {
-      throw new IOException("Wrong result file name:" + path);
-    }
-    return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
-  }
-
-  /**
-   * Make sure all files are moved.
-   * @param fs FileSystem
-   * @param stagingPath The stagind directory
-   * @return
-   * @throws java.io.IOException
-   */
-  private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
-    FileStatus[] files = fs.listStatus(stagingPath);
-    if (files != null && files.length != 0) {
-      for (FileStatus eachFile: files) {
-        if (eachFile.isFile()) {
-          LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
-          return false;
-        } else {
-          if (verifyAllFileMoved(fs, eachFile.getPath())) {
-            fs.delete(eachFile.getPath(), false);
-          } else {
-            return false;
-          }
-        }
-      }
-    }
-
-    return true;
-  }
-
-  /**
-   * This method sets a rename map which includes renamed staging directory to final output directory recursively.
-   * If there exists some data files, this delete it for duplicate data.
-   *
-   *
-   * @param fs
-   * @param stagingPath
-   * @param outputPath
-   * @param stagingParentPathString
-   * @throws java.io.IOException
-   */
-  private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
-                                         String stagingParentPathString,
-                                         Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
-    FileStatus[] files = fs.listStatus(stagingPath);
-
-    for(FileStatus eachFile : files) {
-      if (eachFile.isDirectory()) {
-        Path oldPath = eachFile.getPath();
-
-        // Make recover directory.
-        String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
-            oldTableDir.toString());
-        Path recoveryPath = new Path(recoverPathString);
-        if (!fs.exists(recoveryPath)) {
-          fs.mkdirs(recoveryPath);
-        }
-
-        visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
-            renameDirs, oldTableDir);
-        // Find last order partition for renaming
-        String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
-            outputPath.toString());
-        Path newPath = new Path(newPathString);
-        if (!isLeafDirectory(fs, eachFile.getPath())) {
-          renameDirs.put(eachFile.getPath(), newPath);
-        } else {
-          if (!fs.exists(newPath)) {
-            fs.mkdirs(newPath);
-          }
-        }
-      }
-    }
-  }
-
-  private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
-    boolean retValue = false;
-
-    FileStatus[] files = fs.listStatus(path);
-    for (FileStatus file : files) {
-      if (fs.isDirectory(file.getPath())) {
-        retValue = true;
-        break;
-      }
-    }
-
-    return retValue;
-  }
+                               TableDesc tableDesc) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
new file mode 100644
index 0000000..ef4aa9a
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpace.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * It manages each tablespace; e.g., HDFS, Local file system, and Amazon S3.
+ */
+public interface TableSpace extends Closeable {
+  //public void format() throws IOException;
+
+  void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
+
+  void purgeTable(TableDesc tableDesc) throws IOException;
+
+  List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException;
+
+  List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException;
+
+//  public void renameTable() throws IOException;
+//
+//  public void truncateTable() throws IOException;
+//
+//  public long availableCapacity() throws IOException;
+//
+//  public long totalCapacity() throws IOException;
+
+  Scanner getScanner(TableMeta meta, Schema schema, CatalogProtos.FragmentProto fragment, Schema target) throws IOException;
+
+  Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException;
+
+  Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException;
+
+  Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
+                               LogicalPlan plan, Schema schema,
+                               TableDesc tableDesc) throws IOException;
+
+  void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException;
+
+  List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException;
+
+  void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
new file mode 100644
index 0000000..42a5e07
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * It handles available table spaces and cache TableSpace instances.
+ */
+public class TableSpaceManager {
+
+  /**
+   * Cache of scanner handlers for each storage type.
+   */
+  protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+  /**
+   * Cache of appender handlers for each storage type.
+   */
+  protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends Appender>>();
+  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+      Configuration.class,
+      Schema.class,
+      TableMeta.class,
+      Fragment.class
+  };
+  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+      Configuration.class,
+      TaskAttemptId.class,
+      Schema.class,
+      TableMeta.class,
+      Path.class
+  };
+  /**
+   * Cache of StorageManager.
+   * Key is manager key(warehouse path) + store type
+   */
+  private static final Map<String, StorageManager> storageManagers = Maps.newHashMap();
+  /**
+   * Cache of constructors for each class. Pins the classes so they
+   * can't be garbage collected until ReflectionUtils can be collected.
+   */
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
+      new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+  /**
+   * Clear all class cache
+   */
+  @VisibleForTesting
+  protected synchronized static void clearCache() {
+    CONSTRUCTOR_CACHE.clear();
+    SCANNER_HANDLER_CACHE.clear();
+    APPENDER_HANDLER_CACHE.clear();
+    storageManagers.clear();
+  }
+
+  /**
+   * Close StorageManager
+   * @throws java.io.IOException
+   */
+  public static void shutdown() throws IOException {
+    synchronized(storageManagers) {
+      for (StorageManager eachStorageManager: storageManagers.values()) {
+        eachStorageManager.close();
+      }
+    }
+    clearCache();
+  }
+
+  /**
+   * Returns FileStorageManager instance.
+   *
+   * @param tajoConf Tajo system property.
+   * @return
+   * @throws IOException
+   */
+  public static StorageManager getFileStorageManager(TajoConf tajoConf) throws IOException {
+    return getStorageManager(tajoConf, "CSV");
+  }
+
+  /**
+   * Returns the proper StorageManager instance according to the storeType.
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @return
+   * @throws IOException
+   */
+  public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
+    FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
+    if (fileSystem != null) {
+      return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString());
+    } else {
+      return getStorageManager(tajoConf, storeType, null);
+    }
+  }
+
+  /**
+   * Returns the proper StorageManager instance according to the storeType
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @param managerKey Key that can identify each storage manager(may be a path)
+   * @return
+   * @throws IOException
+   */
+  private static synchronized StorageManager getStorageManager (
+      TajoConf tajoConf, String storeType, String managerKey) throws IOException {
+
+    String typeName;
+    if (storeType.equalsIgnoreCase("HBASE")) {
+      typeName = "hbase";
+    } else {
+      typeName = "hdfs";
+    }
+
+    synchronized (storageManagers) {
+      String storeKey = typeName + "_" + managerKey;
+      StorageManager manager = storageManagers.get(storeKey);
+
+      if (manager == null) {
+        Class<? extends StorageManager> storageManagerClass =
+            tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, StorageManager.class);
+
+        if (storageManagerClass == null) {
+          throw new IOException("Unknown Storage Type: " + typeName);
+        }
+
+        try {
+          Constructor<? extends StorageManager> constructor =
+              (Constructor<? extends StorageManager>) CONSTRUCTOR_CACHE.get(storageManagerClass);
+          if (constructor == null) {
+            constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class});
+            constructor.setAccessible(true);
+            CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
+          }
+          manager = constructor.newInstance(new Object[]{storeType});
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        manager.init(tajoConf);
+        storageManagers.put(storeKey, manager);
+      }
+
+      return manager;
+    }
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param conf The system property
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws IOException
+   */
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+    return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
+  }
+
+  /**
+   * Creates a scanner instance.
+   *
+   * @param theClass Concrete class of scanner
+   * @param conf System property
+   * @param schema Input schema
+   * @param meta Table meta data
+   * @param fragment The fragment for scanning
+   * @param <T>
+   * @return The scanner instance
+   */
+  public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
+                                         Fragment fragment) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+
+  /**
+   * Creates a scanner instance.
+   *
+   * @param theClass Concrete class of scanner
+   * @param conf System property
+   * @param taskAttemptId Task id
+   * @param meta Table meta data
+   * @param schema Input schema
+   * @param workDir Working directory
+   * @param <T>
+   * @return The scanner instance
+   */
+  public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
+                                          TableMeta meta, Schema schema, Path workDir) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 7f3cb04..09a86b4 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -28,7 +28,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
@@ -47,7 +47,7 @@ public class HBasePutAppender extends AbstractHBaseAppender {
     super.init();
 
     Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
-    HConnection hconn = ((HBaseStorageManager) StorageManager.getStorageManager((TajoConf)conf, "HBASE"))
+    HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager((TajoConf) conf, "HBASE"))
         .getConnection(hbaseConf);
     htable = hconn.getTable(columnMapping.getHbaseTableName());
     htable.setAutoFlushTo(false);

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index df60bb3..24bfd4d 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -36,10 +36,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.BytesUtils;
 
@@ -184,7 +181,7 @@ public class HBaseScanner implements Scanner {
     }
 
     if (htable == null) {
-      HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, "HBASE"))
+      HConnection hconn = ((HBaseStorageManager) TableSpaceManager.getStorageManager(conf, "HBASE"))
           .getConnection(hbaseConf);
       htable = hconn.getTable(fragment.getHbaseTableName());
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
index 5f0695c..3653574 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseStorageManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage.hbase;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,6 +43,7 @@ import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.CreateTableNode;
@@ -78,7 +80,7 @@ public class HBaseStorageManager extends StorageManager {
   }
 
   @Override
-  public void closeStorageManager() {
+  public void close() {
     synchronized (connMap) {
       for (HConnection eachConn: connMap.values()) {
         try {
@@ -942,6 +944,8 @@ public class HBaseStorageManager extends StorageManager {
     if (tableDesc == null) {
       throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId);
     }
+    Preconditions.checkArgument(tableDesc.getName() != null && tableDesc.getPath() == null);
+
     Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
     Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
 
@@ -960,29 +964,23 @@ public class HBaseStorageManager extends StorageManager {
     }
     committer.commitJob(jobContext);
 
-    if (tableDesc.getName() == null && tableDesc.getPath() != null) {
-
-      // insert into location
-      return super.commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, false);
-    } else {
-      // insert into table
-      String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY);
+    // insert into table
+    String tableName = tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY);
 
-      HTable htable = new HTable(hbaseConf, tableName);
+    HTable htable = new HTable(hbaseConf, tableName);
+    try {
+      LoadIncrementalHFiles loadIncrementalHFiles = null;
       try {
-        LoadIncrementalHFiles loadIncrementalHFiles = null;
-        try {
-          loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf);
-        } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-          throw new IOException(e.getMessage(), e);
-        }
-        loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable);
-
-        return stagingResultDir;
-      } finally {
-        htable.close();
+        loadIncrementalHFiles = new LoadIncrementalHFiles(hbaseConf);
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        throw new IOException(e.getMessage(), e);
       }
+      loadIncrementalHFiles.doBulkLoad(stagingResultDir, htable);
+
+      return stagingResultDir;
+    } finally {
+      htable.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
index aa7aa28..39ccf44 100644
--- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
@@ -19,14 +19,13 @@
 package org.apache.tajo.storage.hbase;
 
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.util.Pair;
 import org.junit.Test;
 
@@ -48,7 +47,7 @@ public class TestHBaseStorageManager {
     scanNode.setQual(evalNodeA);
 
     HBaseStorageManager storageManager =
-        (HBaseStorageManager) StorageManager.getStorageManager(new TajoConf(), "HBASE");
+        (HBaseStorageManager) TableSpaceManager.getStorageManager(new TajoConf(), "HBASE");
     List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
     assertNotNull(indexEvals);
     assertEquals(1, indexEvals.size());

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index 3daed96..c041771 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -57,7 +57,7 @@ public abstract class FileAppender implements Appender {
           throw new IllegalArgumentException("Configuration must be an instance of TajoConf");
         }
 
-        this.path = ((FileStorageManager)StorageManager.getFileStorageManager((TajoConf) conf))
+        this.path = ((FileStorageManager) TableSpaceManager.getFileStorageManager((TajoConf) conf))
             .getAppenderFilePath(taskAttemptId, workDir);
       } else {
         this.path = workDir;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
index 635dade..4efc3b7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
@@ -27,17 +27,18 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.OverridableConf;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.text.NumberFormat;
@@ -864,7 +865,7 @@ public class FileStorageManager extends StorageManager {
   }
 
   @Override
-  public void closeStorageManager() {
+  public void close() {
   }
 
   @Override
@@ -876,6 +877,12 @@ public class FileStorageManager extends StorageManager {
   }
 
   @Override
+  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan,
+                               Schema schema, TableDesc tableDesc) throws IOException {
+    return commitOutputData(queryContext, true);
+  }
+
+  @Override
   public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
                                           Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange)
       throws IOException {
@@ -899,6 +906,366 @@ public class FileStorageManager extends StorageManager {
     FileStatus status = fs.getFileStatus(path);
     FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
 
-    return getSeekableScanner(conf, meta, schema, fragment, schema);
+    return TableSpaceManager.getSeekableScanner(conf, meta, schema, fragment, schema);
+  }
+
+  /**
+   * Finalizes result data. Tajo stores result data in the staging directory.
+   * If the query fails, clean up the staging directory.
+   * Otherwise the query is successful, move to the final directory from the staging directory.
+   *
+   * @param queryContext The query property
+   * @param changeFileSeq If true change result file name with max sequence.
+   * @return Saved path
+   * @throws java.io.IOException
+   */
+  protected Path commitOutputData(OverridableConf queryContext, boolean changeFileSeq) throws IOException {
+    Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
+    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    Path finalOutputDir;
+    if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
+      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
+      try {
+        FileSystem fs = stagingResultDir.getFileSystem(conf);
+
+        if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO
+
+          // It moves the original table into the temporary location.
+          // Then it moves the new result table into the original table location.
+          // Upon failed, it recovers the original table if possible.
+          boolean movedToOldTable = false;
+          boolean committed = false;
+          Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+          ContentSummary summary = fs.getContentSummary(stagingResultDir);
+
+          if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty() && summary.getFileCount() > 0L) {
+            // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
+            // renaming directory.
+            Map<Path, Path> renameDirs = TUtil.newHashMap();
+            // This is a map for recovering existing partition directory. A key is current directory and a value is
+            // temporary directory to back up.
+            Map<Path, Path> recoveryDirs = TUtil.newHashMap();
+
+            try {
+              if (!fs.exists(finalOutputDir)) {
+                fs.mkdirs(finalOutputDir);
+              }
+
+              visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
+                  renameDirs, oldTableDir);
+
+              // Rename target partition directories
+              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+                // Backup existing data files for recovering
+                if (fs.exists(entry.getValue())) {
+                  String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
+                      oldTableDir.toString());
+                  Path recoveryPath = new Path(recoveryPathString);
+                  fs.rename(entry.getValue(), recoveryPath);
+                  fs.exists(recoveryPath);
+                  recoveryDirs.put(entry.getValue(), recoveryPath);
+                }
+                // Delete existing directory
+                fs.delete(entry.getValue(), true);
+                // Rename staging directory to final output directory
+                fs.rename(entry.getKey(), entry.getValue());
+              }
+
+            } catch (IOException ioe) {
+              // Remove created dirs
+              for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
+                fs.delete(entry.getValue(), true);
+              }
+
+              // Recovery renamed dirs
+              for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
+                fs.delete(entry.getValue(), true);
+                fs.rename(entry.getValue(), entry.getKey());
+              }
+
+              throw new IOException(ioe.getMessage());
+            }
+          } else { // no partition
+            try {
+
+              // if the final output dir exists, move all contents to the temporary table dir.
+              // Otherwise, just make the final output dir. As a result, the final output dir will be empty.
+              if (fs.exists(finalOutputDir)) {
+                fs.mkdirs(oldTableDir);
+
+                for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
+                  fs.rename(status.getPath(), oldTableDir);
+                }
+
+                movedToOldTable = fs.exists(oldTableDir);
+              } else { // if the parent does not exist, make its parent directory.
+                fs.mkdirs(finalOutputDir);
+              }
+
+              // Move the results to the final output dir.
+              for (FileStatus status : fs.listStatus(stagingResultDir)) {
+                fs.rename(status.getPath(), finalOutputDir);
+              }
+
+              // Check the final output dir
+              committed = fs.exists(finalOutputDir);
+
+            } catch (IOException ioe) {
+              // recover the old table
+              if (movedToOldTable && !committed) {
+
+                // if commit is failed, recover the old data
+                for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
+                  fs.delete(status.getPath(), true);
+                }
+
+                for (FileStatus status : fs.listStatus(oldTableDir)) {
+                  fs.rename(status.getPath(), finalOutputDir);
+                }
+              }
+
+              throw new IOException(ioe.getMessage());
+            }
+          }
+        } else {
+          String queryType = queryContext.get(QueryVars.COMMAND_TYPE);
+
+          if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table
+
+            NumberFormat fmt = NumberFormat.getInstance();
+            fmt.setGroupingUsed(false);
+            fmt.setMinimumIntegerDigits(3);
+
+            if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) {
+              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+                if (eachFile.isFile()) {
+                  LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
+                  continue;
+                }
+                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq);
+              }
+            } else {
+              int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
+              for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
+                if (eachFile.getPath().getName().startsWith("_")) {
+                  continue;
+                }
+                moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq);
+              }
+            }
+            // checking all file moved and remove empty dir
+            verifyAllFileMoved(fs, stagingResultDir);
+            FileStatus[] files = fs.listStatus(stagingResultDir);
+            if (files != null && files.length != 0) {
+              for (FileStatus eachFile: files) {
+                LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
+              }
+            }
+          } else { // CREATE TABLE AS SELECT (CTAS)
+            if (fs.exists(finalOutputDir)) {
+              for (FileStatus status : fs.listStatus(stagingResultDir)) {
+                fs.rename(status.getPath(), finalOutputDir);
+              }
+            } else {
+              fs.rename(stagingResultDir, finalOutputDir);
+            }
+            LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
+          }
+        }
+
+        // remove the staging directory if the final output dir is given.
+        Path stagingDirRoot = stagingDir.getParent();
+        fs.delete(stagingDirRoot, true);
+      } catch (Throwable t) {
+        LOG.error(t);
+        throw new IOException(t);
+      }
+    } else {
+      finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    }
+
+    return finalOutputDir;
+  }
+
+  /**
+   * Attach the sequence number to the output file name and than move the file into the final result path.
+   *
+   * @param fs FileSystem
+   * @param stagingResultDir The staging result dir
+   * @param fileStatus The file status
+   * @param finalOutputPath Final output path
+   * @param nf Number format
+   * @param fileSeq The sequence number
+   * @throws java.io.IOException
+   */
+  private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
+                                          FileStatus fileStatus, Path finalOutputPath,
+                                          NumberFormat nf,
+                                          int fileSeq, boolean changeFileSeq) throws IOException {
+    if (fileStatus.isDirectory()) {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (!fs.exists(finalSubPath)) {
+          fs.mkdirs(finalSubPath);
+        }
+        int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
+        for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
+          if (eachFile.getPath().getName().startsWith("_")) {
+            continue;
+          }
+          moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq);
+        }
+      } else {
+        throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
+      }
+    } else {
+      String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
+      if (subPath != null) {
+        Path finalSubPath = new Path(finalOutputPath, subPath);
+        if (changeFileSeq) {
+          finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
+        }
+        if (!fs.exists(finalSubPath.getParent())) {
+          fs.mkdirs(finalSubPath.getParent());
+        }
+        if (fs.exists(finalSubPath)) {
+          throw new IOException("Already exists data file:" + finalSubPath);
+        }
+        boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
+        if (success) {
+          LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
+              "to final output[" + finalSubPath + "]");
+        } else {
+          LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
+              "to final output[" + finalSubPath + "]");
+        }
+      }
+    }
+  }
+
+  /**
+   * Removes the path of the parent.
+   * @param parentPath
+   * @param childPath
+   * @return
+   */
+  private String extractSubPath(Path parentPath, Path childPath) {
+    String parentPathStr = parentPath.toUri().getPath();
+    String childPathStr = childPath.toUri().getPath();
+
+    if (parentPathStr.length() > childPathStr.length()) {
+      return null;
+    }
+
+    int index = childPathStr.indexOf(parentPathStr);
+    if (index != 0) {
+      return null;
+    }
+
+    return childPathStr.substring(parentPathStr.length() + 1);
+  }
+
+  /**
+   * Attach the sequence number to a path.
+   *
+   * @param path Path
+   * @param seq sequence number
+   * @param nf Number format
+   * @return New path attached with sequence number
+   * @throws java.io.IOException
+   */
+  private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
+    String[] tokens = path.getName().split("-");
+    if (tokens.length != 4) {
+      throw new IOException("Wrong result file name:" + path);
+    }
+    return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
+  }
+
+  /**
+   * Make sure all files are moved.
+   * @param fs FileSystem
+   * @param stagingPath The stagind directory
+   * @return
+   * @throws java.io.IOException
+   */
+  private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+    if (files != null && files.length != 0) {
+      for (FileStatus eachFile: files) {
+        if (eachFile.isFile()) {
+          LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
+          return false;
+        } else {
+          if (verifyAllFileMoved(fs, eachFile.getPath())) {
+            fs.delete(eachFile.getPath(), false);
+          } else {
+            return false;
+          }
+        }
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * This method sets a rename map which includes renamed staging directory to final output directory recursively.
+   * If there exists some data files, this delete it for duplicate data.
+   *
+   *
+   * @param fs
+   * @param stagingPath
+   * @param outputPath
+   * @param stagingParentPathString
+   * @throws java.io.IOException
+   */
+  private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
+                                         String stagingParentPathString,
+                                         Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
+    FileStatus[] files = fs.listStatus(stagingPath);
+
+    for(FileStatus eachFile : files) {
+      if (eachFile.isDirectory()) {
+        Path oldPath = eachFile.getPath();
+
+        // Make recover directory.
+        String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
+            oldTableDir.toString());
+        Path recoveryPath = new Path(recoverPathString);
+        if (!fs.exists(recoveryPath)) {
+          fs.mkdirs(recoveryPath);
+        }
+
+        visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
+            renameDirs, oldTableDir);
+        // Find last order partition for renaming
+        String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
+            outputPath.toString());
+        Path newPath = new Path(newPathString);
+        if (!isLeafDirectory(fs, eachFile.getPath())) {
+          renameDirs.put(eachFile.getPath(), newPath);
+        } else {
+          if (!fs.exists(newPath)) {
+            fs.mkdirs(newPath);
+          }
+        }
+      }
+    }
+  }
+
+  private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
+    boolean retValue = false;
+
+    FileStatus[] files = fs.listStatus(path);
+    for (FileStatus file : files) {
+      if (fs.isDirectory(file.getPath())) {
+        retValue = true;
+        break;
+      }
+    }
+
+    return retValue;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 4635b76..1846ed6 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -30,11 +30,9 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.util.Pair;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -84,7 +82,7 @@ public class HashShuffleAppenderManager {
         if (!fs.exists(dataFile.getParent())) {
           fs.mkdirs(dataFile.getParent());
         }
-        FileAppender appender = (FileAppender)((FileStorageManager)StorageManager.getFileStorageManager(tajoConf))
+        FileAppender appender = (FileAppender)((FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf))
             .getAppender(meta, outSchema, dataFile);
         appender.enableStats();
         appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 68a2cf2..779f908 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
@@ -123,7 +122,7 @@ public class TestCompressionStorages {
 
     String fileName = "Compression_" + codec.getSimpleName();
     Path tablePath = new Path(testDir, fileName);
-    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileStorageManager) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
     appender.enableStats();
 
     appender.init();
@@ -155,7 +154,7 @@ public class TestCompressionStorages {
     FileFragment[] tablets = new FileFragment[1];
     tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
 
-    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+    Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
 
     if (storeType.equalsIgnoreCase("CSV")) {
       if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
index 6e15c51..2260d2a 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
@@ -104,7 +103,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
     FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner =  TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple tuple;
@@ -126,7 +125,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
     FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner =  TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
     assertNotNull(scanner.next());
@@ -148,7 +147,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
     FileFragment fragment =  getFileFragment("testErrorTolerance2.json");
-    Scanner scanner =  StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner =  TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
     try {
@@ -167,7 +166,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
     FileFragment fragment = getFileFragment("testErrorTolerance3.json");
-    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
index 7d5eee1..41c6c67 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdfs.*;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
@@ -81,7 +80,7 @@ public class TestFileStorageManager {
 
     Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
     fs.mkdirs(path.getParent());
-    FileStorageManager fileStorageManager = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    FileStorageManager fileStorageManager = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
     assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri());
 
 		Appender appender = fileStorageManager.getAppender(meta, schema, path);
@@ -128,7 +127,7 @@ public class TestFileStorageManager {
       }
 
       assertTrue(fs.exists(tablePath));
-      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf);
+      FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf);
       assertEquals(fs.getUri(), sm.getFileSystem().getUri());
 
       Schema schema = new Schema();
@@ -182,7 +181,7 @@ public class TestFileStorageManager {
         DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
       }
       assertTrue(fs.exists(tablePath));
-      FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(tajoConf);
+      FileStorageManager sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(tajoConf);
       assertEquals(fs.getUri(), sm.getFileSystem().getUri());
 
       Schema schema = new Schema();
@@ -221,11 +220,11 @@ public class TestFileStorageManager {
 
     try {
       /* Local FileSystem */
-      FileStorageManager sm = (FileStorageManager)StorageManager.getStorageManager(conf, "CSV");
+      FileStorageManager sm = (FileStorageManager) TableSpaceManager.getStorageManager(conf, "CSV");
       assertEquals(fs.getUri(), sm.getFileSystem().getUri());
 
       /* Distributed FileSystem */
-      sm = (FileStorageManager)StorageManager.getStorageManager(tajoConf, "CSV");
+      sm = (FileStorageManager) TableSpaceManager.getStorageManager(tajoConf, "CSV");
       assertNotEquals(fs.getUri(), sm.getFileSystem().getUri());
       assertEquals(cluster.getFileSystem().getUri(), sm.getFileSystem().getUri());
     } finally {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index b4a60fc..1222fae 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
@@ -58,7 +57,7 @@ public class TestFileSystems {
   public TestFileSystems(FileSystem fs) throws IOException {
     this.fs = fs;
     this.conf = new TajoConf(fs.getConf());
-    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf);
+    sm = (FileStorageManager) TableSpaceManager.getFileStorageManager(conf);
     testDir = getTestDir(this.fs, TEST_PATH);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 1078b84..266f906 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.io.compress.DeflateCodec;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
@@ -66,7 +65,7 @@ public class TestLineReader {
 
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
     Path tablePath = new Path(testDir, "line.data");
-    FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+    FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -119,7 +118,7 @@ public class TestLineReader {
     meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
 
     Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName());
-    FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+    FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -177,7 +176,7 @@ public class TestLineReader {
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
 
     Path tablePath = new Path(testDir, "testLineDelimitedReader");
-    FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+    FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -280,7 +279,7 @@ public class TestLineReader {
 
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
     Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data");
-    FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(conf).getAppender(
+    FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/5491f0e7/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 2c856e1..82acaf3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
@@ -95,7 +94,7 @@ public class TestMergeScanner {
     conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = StorageManager.getFileStorageManager(conf);
+    sm = TableSpaceManager.getFileStorageManager(conf);
   }
 
   @Test
@@ -115,7 +114,7 @@ public class TestMergeScanner {
     }
 
     Path table1Path = new Path(testDir, storeType + "_1.data");
-    Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path);
+    Appender appender1 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path);
     appender1.enableStats();
     appender1.init();
     int tupleNum = 10000;
@@ -137,7 +136,7 @@ public class TestMergeScanner {
     }
 
     Path table2Path = new Path(testDir, storeType + "_2.data");
-    Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path);
+    Appender appender2 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path);
     appender2.enableStats();
     appender2.init();