You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/06/24 00:49:15 UTC

[2/7] tajo git commit: TAJO-1616: Implement TablespaceManager to load Tablespaces.

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 845c2d7..5fac0cf 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.storage.hbase;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,12 +43,12 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.CreateTableNode;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.Bytes;
@@ -60,6 +59,7 @@ import org.apache.tajo.util.TUtil;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.net.URI;
 import java.util.*;
 
 /**
@@ -68,14 +68,44 @@ import java.util.*;
 public class HBaseTablespace extends Tablespace {
   private static final Log LOG = LogFactory.getLog(HBaseTablespace.class);
 
+  public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty(false, true, true, false);
+
+  public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true);
+
+  private Configuration hbaseConf;
+
+  private final static SortedInsertRewriter REWRITE_RULE = new SortedInsertRewriter();
+
   private Map<HConnectionKey, HConnection> connMap = new HashMap<HConnectionKey, HConnection>();
 
-  public HBaseTablespace(String storeType) {
-    super(storeType);
+  public HBaseTablespace(String spaceName, URI uri) {
+    super(spaceName, uri);
   }
 
   @Override
   public void storageInit() throws IOException {
+    this.hbaseConf = HBaseConfiguration.create(conf);
+    String zkQuorum = extractQuorum(uri);
+    String [] splits = zkQuorum.split(":");
+    hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, splits[0]);
+    hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, splits[1]);
+  }
+
+  @Override
+  public void setConfig(String name, String value) {
+  }
+
+  @Override
+  public void setConfigs(Map<String, String> configs) {
+  }
+
+  public Configuration getHbaseConf() {
+    return hbaseConf;
+  }
+
+  @Override
+  public long getTableVolume(URI uri) throws IOException {
+    return 0;
   }
 
   @Override
@@ -93,13 +123,13 @@ public class HBaseTablespace extends Tablespace {
 
   @Override
   public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException {
-    createTable(tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists);
+    createTable(tableDesc.getUri(), tableDesc.getMeta(), tableDesc.getSchema(), tableDesc.isExternal(), ifNotExists);
     TableStats stats = new TableStats();
     stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
     tableDesc.setStats(stats);
   }
 
-  private void createTable(TableMeta tableMeta, Schema schema,
+  private void createTable(URI uri, TableMeta tableMeta, Schema schema,
                            boolean isExternal, boolean ifNotExists) throws IOException {
     String hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY, "");
     if (hbaseTableName == null || hbaseTableName.trim().isEmpty()) {
@@ -113,7 +143,7 @@ public class HBaseTablespace extends Tablespace {
       throw new IOException("Columns property has more entry than Tajo table columns");
     }
 
-    ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
+    ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions());
     int numRowKeys = 0;
     boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
     for (int i = 0; i < isRowKeyMappings.length; i++) {
@@ -138,8 +168,7 @@ public class HBaseTablespace extends Tablespace {
       }
     }
 
-    Configuration hConf = getHBaseConfiguration(conf, tableMeta);
-    HBaseAdmin hAdmin =  new HBaseAdmin(hConf);
+    HBaseAdmin hAdmin =  new HBaseAdmin(getHbaseConf());
 
     try {
       if (isExternal) {
@@ -210,7 +239,7 @@ public class HBaseTablespace extends Tablespace {
       return null;
     }
 
-    ColumnMapping columnMapping = new ColumnMapping(schema, meta);
+    ColumnMapping columnMapping = new ColumnMapping(schema, meta.getOptions());
     boolean[] isBinaryColumns = columnMapping.getIsBinaryColumns();
     boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
 
@@ -291,45 +320,23 @@ public class HBaseTablespace extends Tablespace {
   }
 
   /**
-   * Creates Configuration instance and sets with hbase connection options.
+   * It extracts quorum addresses from a Hbase Tablespace URI.
+   * For example, consider an example URI 'hbase:zk://host1:2171,host2:2172,host3:2173/table1'.
+   * <code>extractQuorum</code> will extract only 'host1:2171,host2:2172,host3:2173'.
    *
-   * @param conf
-   * @param tableMeta
-   * @return
-   * @throws java.io.IOException
+   * @param uri Hbase Tablespace URI
+   * @return Quorum addresses
    */
-  public static Configuration getHBaseConfiguration(Configuration conf, TableMeta tableMeta) throws IOException {
-    Configuration hbaseConf = (conf == null) ? HBaseConfiguration.create() : HBaseConfiguration.create(conf);
-
-    String zkQuorum = hbaseConf.get(HConstants.ZOOKEEPER_QUORUM);
-    if (tableMeta.containsOption(HBaseStorageConstants.META_ZK_QUORUM_KEY)) {
-      zkQuorum = tableMeta.getOption(HBaseStorageConstants.META_ZK_QUORUM_KEY, "");
-      hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
-    }
-
-    if (zkQuorum == null || zkQuorum.trim().isEmpty()) {
-      throw new IOException("HBase mapped table is required a '" +
-          HBaseStorageConstants.META_ZK_QUORUM_KEY + "' attribute.");
-    }
+  static String extractQuorum(URI uri) {
+    String uriStr = uri.toString();
+    int start = uriStr.indexOf("/") + 2;
+    int pathIndex = uriStr.lastIndexOf("/");
 
-    String zkPort = hbaseConf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-    if (tableMeta.containsOption(HBaseStorageConstants.META_ZK_CLIENT_PORT)) {
-      zkPort = tableMeta.getOption(HBaseStorageConstants.META_ZK_CLIENT_PORT, "");
-      hbaseConf.set(HConstants.ZOOKEEPER_CLIENT_PORT, zkPort);
-    }
-
-    if (zkPort == null || zkPort.trim().isEmpty()) {
-      throw new IOException("HBase mapped table is required a '" +
-        HBaseStorageConstants.META_ZK_CLIENT_PORT + "' attribute.");
-    }
-
-    for (Map.Entry<String, String> eachOption: tableMeta.getOptions().getAllKeyValus().entrySet()) {
-      String key = eachOption.getKey();
-      if (key.startsWith(HConstants.ZK_CFG_PROPERTY_PREFIX)) {
-        hbaseConf.set(key, eachOption.getValue());
-      }
+    if (pathIndex < start) {
+      return uriStr.substring(start);
+    } else {
+      return uriStr.substring(start, pathIndex);
     }
-    return hbaseConf;
   }
 
   /**
@@ -348,7 +355,7 @@ public class HBaseTablespace extends Tablespace {
     }
     TableName hTableName = TableName.valueOf(hbaseTableName);
 
-    ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
+    ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions());
 
     HTableDescriptor hTableDescriptor = new HTableDescriptor(hTableName);
 
@@ -369,7 +376,7 @@ public class HBaseTablespace extends Tablespace {
 
   @Override
   public void purgeTable(TableDesc tableDesc) throws IOException {
-    HBaseAdmin hAdmin =  new HBaseAdmin(getHBaseConfiguration(conf, tableDesc.getMeta()));
+    HBaseAdmin hAdmin =  new HBaseAdmin(hbaseConf);
 
     try {
       HTableDescriptor hTableDesc = parseHTableDescriptor(tableDesc.getMeta(), tableDesc.getSchema());
@@ -381,6 +388,11 @@ public class HBaseTablespace extends Tablespace {
     }
   }
 
+  @Override
+  public URI getTableUri(String databaseName, String tableName) {
+    return URI.create(uri.toString() + "/" + tableName);
+  }
+
   /**
    * Returns columns which are mapped to the rowkey of the hbase table.
    *
@@ -389,7 +401,7 @@ public class HBaseTablespace extends Tablespace {
    * @throws java.io.IOException
    */
   private Column[] getIndexableColumns(TableDesc tableDesc) throws IOException {
-    ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
+    ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions());
     boolean[] isRowKeyMappings = columnMapping.getIsRowKeyMappings();
     int[] rowKeyIndexes = columnMapping.getRowKeyFieldIndexes();
 
@@ -407,15 +419,14 @@ public class HBaseTablespace extends Tablespace {
 
   @Override
   public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, ScanNode scanNode) throws IOException {
-    ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
+    ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions());
 
     List<IndexPredication> indexPredications = getIndexPredications(columnMapping, tableDesc, scanNode);
-    Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta());
     HTable htable = null;
     HBaseAdmin hAdmin = null;
 
     try {
-      htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
+      htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
 
       org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
       if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
@@ -424,8 +435,12 @@ public class HBaseTablespace extends Tablespace {
           throw new IOException("Expecting at least one region.");
         }
         List<Fragment> fragments = new ArrayList<Fragment>(1);
-        Fragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(),
-            HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, regLoc.getHostname());
+        Fragment fragment = new HBaseFragment(
+            tableDesc.getUri(),
+            fragmentId, htable.getName().getNameAsString(),
+            HConstants.EMPTY_BYTE_ARRAY,
+            HConstants.EMPTY_BYTE_ARRAY,
+            regLoc.getHostname());
         fragments.add(fragment);
         return fragments;
       }
@@ -458,7 +473,7 @@ public class HBaseTablespace extends Tablespace {
         stopRows = TUtil.newList(HConstants.EMPTY_END_ROW);
       }
 
-      hAdmin =  new HBaseAdmin(hconf);
+      hAdmin =  new HBaseAdmin(hbaseConf);
       Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>();
 
       // region startkey -> HBaseFragment
@@ -499,8 +514,12 @@ public class HBaseTablespace extends Tablespace {
                 prevFragment.setStopRow(fragmentStop);
               }
             } else {
-              HBaseFragment fragment = new HBaseFragment(fragmentId, htable.getName().getNameAsString(),
-                  fragmentStart, fragmentStop, location.getHostname());
+              HBaseFragment fragment = new HBaseFragment(tableDesc.getUri(),
+                  fragmentId,
+                  htable.getName().getNameAsString(),
+                  fragmentStart,
+                  fragmentStop,
+                  location.getHostname());
 
               // get region size
               boolean foundLength = false;
@@ -557,7 +576,7 @@ public class HBaseTablespace extends Tablespace {
                               TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
       throws IOException {
     if ("true".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
-      return new HBasePutAppender(conf, taskAttemptId, schema, meta, workDir);
+      return new HBasePutAppender(conf, uri, taskAttemptId, schema, meta, workDir);
     } else {
       return super.getAppender(queryContext, taskAttemptId, meta, schema, workDir);
     }
@@ -566,17 +585,16 @@ public class HBaseTablespace extends Tablespace {
   @Override
   public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments)
       throws IOException {
-    Configuration hconf = getHBaseConfiguration(conf, tableDesc.getMeta());
     HTable htable = null;
     HBaseAdmin hAdmin = null;
     try {
-      htable = new HTable(hconf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
+      htable = new HTable(hbaseConf, tableDesc.getMeta().getOption(HBaseStorageConstants.META_TABLE_KEY));
 
       org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
       if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
         return new ArrayList<Fragment>(1);
       }
-      hAdmin =  new HBaseAdmin(hconf);
+      hAdmin =  new HBaseAdmin(hbaseConf);
       Map<ServerName, ServerLoad> serverLoadMap = new HashMap<ServerName, ServerLoad>();
 
       List<Fragment> fragments = new ArrayList<Fragment>(keys.getFirst().length);
@@ -599,8 +617,13 @@ public class HBaseTablespace extends Tablespace {
           serverLoadMap.put(location.getServerName(), serverLoad);
         }
 
-        HBaseFragment fragment = new HBaseFragment(tableDesc.getName(), htable.getName().getNameAsString(),
-            location.getRegionInfo().getStartKey(), location.getRegionInfo().getEndKey(), location.getHostname());
+        HBaseFragment fragment = new HBaseFragment(
+            tableDesc.getUri(),
+            tableDesc.getName(),
+            htable.getName().getNameAsString(),
+            location.getRegionInfo().getStartKey(),
+            location.getRegionInfo().getEndKey(),
+            location.getHostname());
 
         // get region size
         boolean foundLength = false;
@@ -642,7 +665,7 @@ public class HBaseTablespace extends Tablespace {
     }
   }
 
-  public HConnection getConnection(Configuration hbaseConf) throws IOException {
+  public HConnection getConnection() throws IOException {
     synchronized(connMap) {
       HConnectionKey key = new HConnectionKey(hbaseConf);
       HConnection conn = connMap.get(key);
@@ -937,18 +960,17 @@ public class HBaseTablespace extends Tablespace {
   }
 
   @Override
-  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
-                               LogicalPlan plan, Schema schema,
-                               TableDesc tableDesc) throws IOException {
+  public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId,
+                          LogicalPlan plan, Schema schema,
+                          TableDesc tableDesc) throws IOException {
     if (tableDesc == null) {
       throw new IOException("TableDesc is null while calling loadIncrementalHFiles: " + finalEbId);
     }
-    Preconditions.checkArgument(tableDesc.getName() != null && tableDesc.getPath() == null);
 
     Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
     Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
 
-    Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta());
+    Configuration hbaseConf = HBaseConfiguration.create(this.hbaseConf);
     hbaseConf.set("hbase.loadincremental.threads.max", "2");
 
     JobContextImpl jobContext = new JobContextImpl(hbaseConf,
@@ -993,8 +1015,7 @@ public class HBaseTablespace extends Tablespace {
         sortKeyIndexes[i] = inputSchema.getColumnId(sortSpecs[i].getSortKey().getQualifiedName());
       }
 
-      ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
-      Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(queryContext.getConf(), tableDesc.getMeta());
+      ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta().getOptions());
 
       HTable htable = new HTable(hbaseConf, columnMapping.getHbaseTableName());
       try {
@@ -1062,59 +1083,45 @@ public class HBaseTablespace extends Tablespace {
     }
   }
 
-  public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException {
-    if ("false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"))) {
-      List<LogicalPlanRewriteRule> rules = new ArrayList<LogicalPlanRewriteRule>();
-      rules.add(new AddSortForInsertRewriter(tableDesc, getIndexColumns(tableDesc)));
-      return rules;
-    } else {
-      return null;
+  @Override
+  public void rewritePlan(OverridableConf context, LogicalPlan plan) throws PlanningException {
+    if (REWRITE_RULE.isEligible(context, plan)) {
+      REWRITE_RULE.rewrite(context, plan);
     }
   }
 
-  private Column[] getIndexColumns(TableDesc tableDesc) throws IOException {
-    List<Column> indexColumns = new ArrayList<Column>();
-
-    ColumnMapping columnMapping = new ColumnMapping(tableDesc.getSchema(), tableDesc.getMeta());
-
-    boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
-    for (int i = 0; i < isRowKeys.length; i++) {
-      if (isRowKeys[i]) {
-        indexColumns.add(tableDesc.getSchema().getColumn(i));
-      }
-    }
-
-    return indexColumns.toArray(new Column[]{});
+  @Override
+  public StorageProperty getProperty() {
+    return HBASE_STORAGE_PROPERTIES;
   }
 
   @Override
-  public StorageProperty getStorageProperty() {
-    StorageProperty storageProperty = new StorageProperty();
-    storageProperty.setSortedInsert(true);
-    storageProperty.setSupportsInsertInto(true);
-    return storageProperty;
+  public FormatProperty getFormatProperty(String format) {
+    return HFILE_FORMAT_PROPERTIES;
   }
 
-  public void beforeInsertOrCATS(LogicalNode node) throws IOException {
+  public void prepareTable(LogicalNode node) throws IOException {
     if (node.getType() == NodeType.CREATE_TABLE) {
       CreateTableNode cNode = (CreateTableNode)node;
       if (!cNode.isExternal()) {
         TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions());
-        createTable(tableMeta, cNode.getTableSchema(), cNode.isExternal(), cNode.isIfNotExists());
+        createTable(
+            ((CreateTableNode) node).getUri(), tableMeta, cNode.getTableSchema(),
+            cNode.isExternal(), cNode.isIfNotExists());
       }
     }
   }
 
   @Override
-  public void rollbackOutputCommit(LogicalNode node) throws IOException {
+  public void rollbackTable(LogicalNode node) throws IOException {
     if (node.getType() == NodeType.CREATE_TABLE) {
       CreateTableNode cNode = (CreateTableNode)node;
       if (cNode.isExternal()) {
         return;
       }
-      TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions());
-      HBaseAdmin hAdmin =  new HBaseAdmin(getHBaseConfiguration(conf, tableMeta));
 
+      HBaseAdmin hAdmin =  new HBaseAdmin(this.hbaseConf);
+      TableMeta tableMeta = new TableMeta(cNode.getStorageType(), cNode.getOptions());
       try {
         HTableDescriptor hTableDesc = parseHTableDescriptor(tableMeta, cNode.getTableSchema());
         LOG.info("Delete table cause query failed:" + new String(hTableDesc.getName()));
@@ -1127,7 +1134,7 @@ public class HBaseTablespace extends Tablespace {
   }
 
   @Override
-  public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException  {
+  public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException  {
     if (tableDesc != null) {
       Schema tableSchema = tableDesc.getSchema();
       if (tableSchema.size() != outSchema.size()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
new file mode 100644
index 0000000..ebf557e
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/SortedInsertRewriter.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.logical.SortNode.SortPurpose;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
+import org.apache.tajo.util.KeyValueSet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This rewrite rule injects a sort operation to preserve the writing rows in
+ * an ascending order of HBase row keys, required by HFile.
+ */
+public class SortedInsertRewriter implements LogicalPlanRewriteRule {
+
+  @Override
+  public String getName() {
+    return "SortedInsertRewriter";
+  }
+
+  @Override
+  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
+    boolean hbaseMode = "false".equalsIgnoreCase(queryContext.get(HBaseStorageConstants.INSERT_PUT_MODE, "false"));
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+    LogicalNode node = rootNode.getChild();
+    return hbaseMode && node.getType() == NodeType.CREATE_TABLE || node.getType() == NodeType.INSERT;
+  }
+
+  public static Column[] getIndexColumns(Schema tableSchema, KeyValueSet tableProperty) throws IOException {
+    List<Column> indexColumns = new ArrayList<Column>();
+
+    ColumnMapping columnMapping = new ColumnMapping(tableSchema, tableProperty);
+
+    boolean[] isRowKeys = columnMapping.getIsRowKeyMappings();
+    for (int i = 0; i < isRowKeys.length; i++) {
+      if (isRowKeys[i]) {
+        indexColumns.add(tableSchema.getColumn(i));
+      }
+    }
+
+    return indexColumns.toArray(new Column[]{});
+  }
+
+  @Override
+  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException {
+    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+
+    StoreTableNode storeTable = rootNode.getChild();
+    Schema tableSchema = storeTable.getTableSchema();
+
+    Column[] sortColumns;
+    try {
+      sortColumns = getIndexColumns(tableSchema, storeTable.getOptions());
+    } catch (IOException e) {
+      throw new PlanningException(e);
+    }
+
+    int[] sortColumnIndexes = new int[sortColumns.length];
+    for (int i = 0; i < sortColumns.length; i++) {
+      sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
+    }
+
+    UnaryNode insertNode = rootNode.getChild();
+    LogicalNode childNode = insertNode.getChild();
+
+    Schema sortSchema = childNode.getOutSchema();
+    SortNode sortNode = plan.createNode(SortNode.class);
+    sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
+    sortNode.setInSchema(sortSchema);
+    sortNode.setOutSchema(sortSchema);
+
+    SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
+    int index = 0;
+
+    for (int i = 0; i < sortColumnIndexes.length; i++) {
+      Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
+      if (sortColumn == null) {
+        throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
+      }
+      sortSpecs[index++] = new SortSpec(sortColumn, true, true);
+    }
+    sortNode.setSortSpecs(sortSpecs);
+
+    sortNode.setChild(insertNode.getChild());
+    insertNode.setChild(sortNode);
+    plan.getRootBlock().registerNode(sortNode);
+
+    return plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
index 668b116..33d45b3 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
+++ b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto
@@ -25,11 +25,12 @@ option java_generate_equals_and_hash = true;
 import "CatalogProtos.proto";
 
 message HBaseFragmentProto {
-  required string tableName = 1;
-  required string hbaseTableName = 2;
-  required bytes startRow = 3;
-  required bytes stopRow = 4;
-  required bool last = 5;
-  required int64 length = 6;
-  optional string regionLocation = 7;
+  required string uri = 1;
+  required string tableName = 2;
+  required string hbaseTableName = 3;
+  required bytes startRow = 4;
+  required bytes stopRow = 5;
+  required bool last = 6;
+  required int64 length = 7;
+  optional string regionLocation = 8;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
index dd52324..5b1e2bd 100644
--- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java
@@ -43,7 +43,7 @@ public class TestColumnMapping {
 
     TableMeta tableMeta = new TableMeta("HBASE", keyValueSet);
 
-    ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta);
+    ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta.getOptions());
 
     List<String> cfNames = columnMapping.getColumnFamilyNames();
     assertEquals(2, cfNames.size());

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
deleted file mode 100644
index b59fe7b..0000000
--- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.TextDatum;
-import org.apache.tajo.plan.expr.*;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.TableSpaceManager;
-import org.apache.tajo.util.Pair;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class TestHBaseStorageManager {
-  @Test
-  public void testGetIndexPredications() throws Exception {
-    Column rowkeyColumn = new Column("rk", Type.TEXT);
-    // where rk >= '020' and rk <= '055'
-    ScanNode scanNode = new ScanNode(1);
-    EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("020")));
-    EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("055")));
-    EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
-    scanNode.setQual(evalNodeA);
-
-    HBaseTablespace storageManager =
-        (HBaseTablespace) TableSpaceManager.getStorageManager(new TajoConf(), "HBASE");
-    List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
-    assertNotNull(indexEvals);
-    assertEquals(1, indexEvals.size());
-    Pair<Datum, Datum> indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
-    assertEquals("020", indexPredicateValue.getFirst().asChars());
-    assertEquals("055", indexPredicateValue.getSecond().asChars());
-
-    // where (rk >= '020' and rk <= '055') or rk = '075'
-    EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(rowkeyColumn),new ConstEval(new TextDatum("075")));
-    EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
-    scanNode.setQual(evalNodeB);
-    indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
-    assertEquals(2, indexEvals.size());
-    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
-    assertEquals("020", indexPredicateValue.getFirst().asChars());
-    assertEquals("055", indexPredicateValue.getSecond().asChars());
-
-    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
-    assertEquals("075", indexPredicateValue.getFirst().asChars());
-    assertEquals("075", indexPredicateValue.getSecond().asChars());
-
-    // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
-    EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
-    EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
-    EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
-    EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
-    scanNode.setQual(evalNodeD);
-    indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
-    assertEquals(2, indexEvals.size());
-
-    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
-    assertEquals("020", indexPredicateValue.getFirst().asChars());
-    assertEquals("055", indexPredicateValue.getSecond().asChars());
-
-    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
-    assertEquals("072", indexPredicateValue.getFirst().asChars());
-    assertEquals("078", indexPredicateValue.getSecond().asChars());
-
-    // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078' and rk >= '073')
-    evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
-    evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
-    evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
-    EvalNode evalNode6 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("073")));
-    evalNodeD = new BinaryEval(EvalType.AND, evalNodeC, evalNode6);
-    EvalNode evalNodeE = new BinaryEval(EvalType.OR, evalNodeA, evalNodeD);
-    scanNode.setQual(evalNodeE);
-    indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
-    assertEquals(2, indexEvals.size());
-
-    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
-    assertEquals("020", indexPredicateValue.getFirst().asChars());
-    assertEquals("055", indexPredicateValue.getSecond().asChars());
-
-    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
-    assertEquals("073", indexPredicateValue.getFirst().asChars());
-    assertEquals("078", indexPredicateValue.getSecond().asChars());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
new file mode 100644
index 0000000..f7cbb5a
--- /dev/null
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.hbase;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.plan.expr.*;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.util.Pair;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestHBaseTableSpace {
+  @BeforeClass
+  public static void setUp() throws IOException {
+    String tableSpaceUri = "hbase:zk://host1:2171";
+    HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri));
+    hBaseTablespace.init(new TajoConf());
+    TableSpaceManager.addTableSpaceForTest(hBaseTablespace);
+  }
+
+  @Test
+  public void testExtractQuorum() {
+    assertEquals("host1:2171", HBaseTablespace.extractQuorum(URI.create("hbase:zk://host1:2171")));
+    assertEquals("host1:2171", HBaseTablespace.extractQuorum(URI.create("hbase:zk://host1:2171/table1")));
+    assertEquals("host1:2171,host2:2172",
+        HBaseTablespace.extractQuorum(URI.create("hbase:zk://host1:2171,host2:2172/table1")));
+  }
+
+  @Test
+  public void testTablespaceHandler() throws Exception {
+    assertTrue((TableSpaceManager.getByName("cluster1").get()) instanceof HBaseTablespace);
+    assertTrue((TableSpaceManager.get(URI.create("hbase:zk://host1:2171")).get())
+        instanceof HBaseTablespace);
+  }
+
+  @Test
+  public void testGetIndexPredications() throws Exception {
+    Column rowkeyColumn = new Column("rk", Type.TEXT);
+    // where rk >= '020' and rk <= '055'
+    ScanNode scanNode = new ScanNode(1);
+    EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("020")));
+    EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("055")));
+    EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
+    scanNode.setQual(evalNodeA);
+
+    HBaseTablespace storageManager = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+    assertNotNull(indexEvals);
+    assertEquals(1, indexEvals.size());
+    Pair<Datum, Datum> indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+    assertEquals("020", indexPredicateValue.getFirst().asChars());
+    assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+    // where (rk >= '020' and rk <= '055') or rk = '075'
+    EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(rowkeyColumn),new ConstEval(new TextDatum("075")));
+    EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
+    scanNode.setQual(evalNodeB);
+    indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+    assertEquals(2, indexEvals.size());
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+    assertEquals("020", indexPredicateValue.getFirst().asChars());
+    assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+    assertEquals("075", indexPredicateValue.getFirst().asChars());
+    assertEquals("075", indexPredicateValue.getSecond().asChars());
+
+    // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
+    EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
+    EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
+    EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+    EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
+    scanNode.setQual(evalNodeD);
+    indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+    assertEquals(2, indexEvals.size());
+
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+    assertEquals("020", indexPredicateValue.getFirst().asChars());
+    assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+    assertEquals("072", indexPredicateValue.getFirst().asChars());
+    assertEquals("078", indexPredicateValue.getSecond().asChars());
+
+    // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078' and rk >= '073')
+    evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072")));
+    evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078")));
+    evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
+    EvalNode evalNode6 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("073")));
+    evalNodeD = new BinaryEval(EvalType.AND, evalNodeC, evalNode6);
+    EvalNode evalNodeE = new BinaryEval(EvalType.OR, evalNodeA, evalNodeD);
+    scanNode.setQual(evalNodeE);
+    indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
+    assertEquals(2, indexEvals.size());
+
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0));
+    assertEquals("020", indexPredicateValue.getFirst().asChars());
+    assertEquals("055", indexPredicateValue.getSecond().asChars());
+
+    indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1));
+    assertEquals("073", indexPredicateValue.getFirst().asChars());
+    assertEquals("078", indexPredicateValue.getSecond().asChars());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml
index 3a59ec9..9b98b0d 100644
--- a/tajo-storage/tajo-storage-hdfs/pom.xml
+++ b/tajo-storage/tajo-storage-hdfs/pom.xml
@@ -74,6 +74,8 @@
         <configuration>
           <excludes>
             <exclude>src/test/resources/dataset/**</exclude>
+            <exclude>src/main/resources/*.json</exclude>
+            <exclude>src/test/resources/*.json</exclude>
           </excludes>
         </configuration>
       </plugin>
@@ -350,10 +352,6 @@
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
     </dependency>
-    <dependency>
-      <groupId>net.minidev</groupId>
-      <artifactId>json-smart</artifactId>
-    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index 5c8242f..081fa3f 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.storage;
 
+import com.google.common.base.Optional;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -51,20 +52,23 @@ public abstract class FileAppender implements Appender {
     this.workDir = workDir;
     this.taskAttemptId = taskAttemptId;
 
-    try {
-      if (taskAttemptId != null) {
-        if (!(conf instanceof TajoConf)) {
-          throw new IllegalArgumentException("Configuration must be an instance of TajoConf");
-        }
 
-        this.path = ((FileTablespace) TableSpaceManager.getFileStorageManager((TajoConf) conf))
-            .getAppenderFilePath(taskAttemptId, workDir);
-      } else {
-        this.path = workDir;
+    if (taskAttemptId != null) {
+      if (!(conf instanceof TajoConf)) {
+        throw new IllegalArgumentException("Configuration must be an instance of TajoConf");
       }
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-      throw new IllegalStateException("Error while opeining FileAppender: " + e.getMessage(), e);
+
+      Optional<FileTablespace> spaceResult = TableSpaceManager.get(workDir.toUri());
+
+      if (!spaceResult.isPresent()) {
+        throw new IllegalStateException("No TableSpace for " + workDir.toUri());
+      }
+
+      FileTablespace space = spaceResult.get();
+      this.path = space.getAppenderFilePath(taskAttemptId, workDir);
+
+    } else {
+      this.path = workDir;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 6ab8574..2ce1f09 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -35,18 +35,28 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.Bytes;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
+import java.net.URI;
 import java.text.NumberFormat;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT;
+
 public class FileTablespace extends Tablespace {
+
+  public static final PathFilter hiddenFileFilter = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
   private final Log LOG = LogFactory.getLog(FileTablespace.class);
 
   static final String OUTPUT_FILE_PREFIX="part-";
@@ -83,27 +93,54 @@ public class FileTablespace extends Tablespace {
       };
 
   protected FileSystem fs;
-  protected Path tableBaseDir;
+  protected Path basePath;
   protected boolean blocksMetadataEnabled;
   private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
 
-  public FileTablespace(String storeType) {
-    super(storeType);
+  public FileTablespace(String spaceName, URI uri) {
+    super(spaceName, uri);
   }
 
   @Override
   protected void storageInit() throws IOException {
-    this.tableBaseDir = TajoConf.getWarehouseDir(conf);
-    this.fs = tableBaseDir.getFileSystem(conf);
-    this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
-        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
-    if (!this.blocksMetadataEnabled)
+    this.basePath = new Path(uri);
+    this.fs = basePath.getFileSystem(conf);
+    this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString());
+
+    this.blocksMetadataEnabled =
+        conf.getBoolean(DFS_HDFS_BLOCKS_METADATA_ENABLED, DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+
+    if (!this.blocksMetadataEnabled) {
       LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
+    }
+  }
+
+  @Override
+  public void setConfig(String name, String value) {
+    conf.set(name, value);
+  }
+
+  @Override
+  public void setConfigs(Map<String, String> configs) {
+    for (Map.Entry<String, String> c : configs.entrySet()) {
+      conf.set(c.getKey(), c.getValue());
+    }
+  }
+
+  @Override
+  public long getTableVolume(URI uri) throws IOException {
+    Path path = new Path(uri);
+    ContentSummary summary = fs.getContentSummary(path);
+    return summary.getLength();
+  }
+
+  @Override
+  public URI getRootUri() {
+    return fs.getUri();
   }
 
   public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
       throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
     FileStatus status = fs.getFileStatus(path);
     return getFileScanner(meta, schema, path, status);
   }
@@ -128,8 +165,9 @@ public class FileTablespace extends Tablespace {
     return fileSystem.exists(path);
   }
 
-  public Path getTablePath(String tableName) {
-    return new Path(tableBaseDir, tableName);
+  @Override
+  public URI getTableUri(String databaseName, String tableName) {
+    return StorageUtil.concatPath(basePath, databaseName, tableName).toUri();
   }
 
   private String partitionPath = "";
@@ -154,12 +192,12 @@ public class FileTablespace extends Tablespace {
   }
 
   public FileFragment[] split(String tableName) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
+    Path tablePath = new Path(basePath, tableName);
     return split(tableName, tablePath, fs.getDefaultBlockSize());
   }
 
   public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
+    Path tablePath = new Path(basePath, tableName);
     return split(tableName, tablePath, fragmentSize);
   }
 
@@ -314,7 +352,6 @@ public class FileTablespace extends Tablespace {
     for (int i = 0; i < dirs.length; ++i) {
       Path p = dirs[i];
 
-      FileSystem fs = p.getFileSystem(conf);
       FileStatus[] matches = fs.globStatus(p, inputFilter);
       if (matches == null) {
         errors.add(new IOException("Input path does not exist: " + p));
@@ -323,8 +360,7 @@ public class FileTablespace extends Tablespace {
       } else {
         for (FileStatus globStat : matches) {
           if (globStat.isDirectory()) {
-            for (FileStatus stat : fs.listStatus(globStat.getPath(),
-                inputFilter)) {
+            for (FileStatus stat : fs.listStatus(globStat.getPath(), inputFilter)) {
               result.add(stat);
             }
           } else {
@@ -492,8 +528,6 @@ public class FileTablespace extends Tablespace {
     List<BlockLocation> blockLocations = Lists.newArrayList();
 
     for (Path p : inputs) {
-      FileSystem fs = p.getFileSystem(conf);
-
       ArrayList<FileStatus> files = Lists.newArrayList();
       if (fs.isFile(p)) {
         files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
@@ -586,7 +620,7 @@ public class FileTablespace extends Tablespace {
       return;
     }
 
-    DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
+    DistributedFileSystem fs = (DistributedFileSystem) this.fs;
     int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
     int blockLocationIdx = 0;
 
@@ -629,7 +663,7 @@ public class FileTablespace extends Tablespace {
 
   @Override
   public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException {
-    return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getPath()));
+    return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getUri()));
   }
 
   @Override
@@ -640,13 +674,13 @@ public class FileTablespace extends Tablespace {
       String simpleTableName = splitted[1];
 
       // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
-      Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName);
-      tableDesc.setPath(tablePath.toUri());
+      Path tablePath = StorageUtil.concatPath(basePath, databaseName, simpleTableName);
+      tableDesc.setUri(tablePath.toUri());
     } else {
-      Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given.");
+      Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given.");
     }
 
-    Path path = new Path(tableDesc.getPath());
+    Path path = new Path(tableDesc.getUri());
 
     FileSystem fs = path.getFileSystem(conf);
     TableStats stats = new TableStats();
@@ -679,7 +713,7 @@ public class FileTablespace extends Tablespace {
   @Override
   public void purgeTable(TableDesc tableDesc) throws IOException {
     try {
-      Path path = new Path(tableDesc.getPath());
+      Path path = new Path(tableDesc.getUri());
       FileSystem fs = path.getFileSystem(conf);
       LOG.info("Delete table data dir: " + path);
       fs.delete(path, true);
@@ -692,7 +726,7 @@ public class FileTablespace extends Tablespace {
   public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numResultFragments) throws IOException {
     // Listing table data file which is not empty.
     // If the table is a partitioned table, return file list which has same partition key.
-    Path tablePath = new Path(tableDesc.getPath());
+    Path tablePath = new Path(tableDesc.getUri());
     FileSystem fs = tablePath.getFileSystem(conf);
 
     //In the case of partitioned table, we should return same partition key data files.
@@ -704,7 +738,7 @@ public class FileTablespace extends Tablespace {
     List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
     if (fs.exists(tablePath)) {
       if (!partitionPath.isEmpty()) {
-        Path partPath = new Path(tableDesc.getPath() + partitionPath);
+        Path partPath = new Path(tableDesc.getUri() + partitionPath);
         if (fs.exists(partPath)) {
           getNonZeroLengthDataFiles(fs, partPath, nonZeroLengthFiles, currentPage, numResultFragments,
                   new AtomicInteger(0), tableDesc.hasPartition(), this.currentDepth, partitionDepth);
@@ -768,7 +802,7 @@ public class FileTablespace extends Tablespace {
     // Intermediate directory
     if (fs.isDirectory(path)) {
 
-      FileStatus[] files = fs.listStatus(path, Tablespace.hiddenFileFilter);
+      FileStatus[] files = fs.listStatus(path, hiddenFileFilter);
 
       if (files != null && files.length > 0) {
 
@@ -817,44 +851,43 @@ public class FileTablespace extends Tablespace {
     }
   }
 
-  @Override
-  public StorageProperty getStorageProperty() {
-    StorageProperty storageProperty = new StorageProperty();
-    storageProperty.setSortedInsert(false);
-    if (storeType.equalsIgnoreCase("RAW")) {
-      storageProperty.setSupportsInsertInto(false);
-    } else {
-      storageProperty.setSupportsInsertInto(true);
-    }
+  private static final StorageProperty FileStorageProperties = new StorageProperty(true, true, true, true);
+  private static final FormatProperty GeneralFileProperties = new FormatProperty(false);
+  private static final FormatProperty HFileProperties = new FormatProperty(true);
 
-    return storageProperty;
+  @Override
+  public StorageProperty getProperty() {
+    return FileStorageProperties;
   }
 
   @Override
-  public void close() {
+  public FormatProperty getFormatProperty(String format) {
+    if (format.equalsIgnoreCase("hbase")) {
+      return HFileProperties;
+    } else {
+      return GeneralFileProperties;
+    }
   }
 
   @Override
-  public void beforeInsertOrCATS(LogicalNode node) throws IOException {
+  public void close() {
   }
 
   @Override
-  public void rollbackOutputCommit(LogicalNode node) throws IOException {
+  public void prepareTable(LogicalNode node) throws IOException {
   }
 
   @Override
-  public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException {
+  public void rollbackTable(LogicalNode node) throws IOException {
   }
 
   @Override
-  public List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc)
-      throws IOException {
-    return null;
+  public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException {
   }
 
   @Override
-  public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan,
-                               Schema schema, TableDesc tableDesc) throws IOException {
+  public Path commitTable(OverridableConf queryContext, ExecutionBlockId finalEbId, LogicalPlan plan,
+                          Schema schema, TableDesc tableDesc) throws IOException {
     return commitOutputData(queryContext, true);
   }
 
@@ -879,8 +912,8 @@ public class FileTablespace extends Tablespace {
     Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR));
     Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
     Path finalOutputDir;
-    if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) {
-      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH));
+    if (!queryContext.get(QueryVars.OUTPUT_TABLE_URI, "").isEmpty()) {
+      finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_URI));
       try {
         FileSystem fs = stagingResultDir.getFileSystem(conf);
 
@@ -949,7 +982,7 @@ public class FileTablespace extends Tablespace {
               if (fs.exists(finalOutputDir)) {
                 fs.mkdirs(oldTableDir);
 
-                for (FileStatus status : fs.listStatus(finalOutputDir, Tablespace.hiddenFileFilter)) {
+                for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) {
                   fs.rename(status.getPath(), oldTableDir);
                 }
 
@@ -971,7 +1004,7 @@ public class FileTablespace extends Tablespace {
               if (movedToOldTable && !committed) {
 
                 // if commit is failed, recover the old data
-                for (FileStatus status : fs.listStatus(finalOutputDir, Tablespace.hiddenFileFilter)) {
+                for (FileStatus status : fs.listStatus(finalOutputDir, hiddenFileFilter)) {
                   fs.delete(status.getPath(), true);
                 }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 66c7f13..49485f5 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -82,8 +82,9 @@ public class HashShuffleAppenderManager {
         if (!fs.exists(dataFile.getParent())) {
           fs.mkdirs(dataFile.getParent());
         }
-        FileAppender appender = (FileAppender)((FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf))
-            .getAppender(meta, outSchema, dataFile);
+
+        FileTablespace space = (FileTablespace) TableSpaceManager.get(dataFile.toUri()).get();
+        FileAppender appender = (FileAppender) space.getAppender(meta, outSchema, dataFile);
         appender.enableStats();
         appender.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 75ad0d5..ab63d55 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -122,7 +122,7 @@ public class TestCompressionStorages {
 
     String fileName = "Compression_" + codec.getSimpleName();
     Path tablePath = new Path(testDir, fileName);
-    Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf)).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.enableStats();
 
     appender.init();
@@ -154,7 +154,7 @@ public class TestCompressionStorages {
     FileFragment[] tablets = new FileFragment[1];
     tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
 
-    Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema);
 
     if (storeType.equalsIgnoreCase("CSV")) {
       if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
index 9726ecc..2d919cd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -103,7 +103,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
     FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple tuple;
@@ -125,7 +125,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
     FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     assertNotNull(scanner.next());
@@ -147,7 +147,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
     FileFragment fragment =  getFileFragment("testErrorTolerance2.json");
-    Scanner scanner =  TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     try {
@@ -166,7 +166,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
     FileFragment fragment = getFileFragment("testErrorTolerance3.json");
-    Scanner scanner = TableSpaceManager.getFileStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
deleted file mode 100644
index a6d6077..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileStorageManager.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.*;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-public class TestFileStorageManager {
-	private TajoConf conf;
-	private static String TEST_PATH = "target/test-data/TestFileStorageManager";
-  private Path testDir;
-  private FileSystem fs;
-
-	@Before
-	public void setUp() throws Exception {
-		conf = new TajoConf();
-    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    fs = testDir.getFileSystem(conf);
-	}
-
-	@After
-	public void tearDown() throws Exception {
-	}
-
-  @Test
-	public final void testGetScannerAndAppender() throws IOException {
-		Schema schema = new Schema();
-		schema.addColumn("id", Type.INT4);
-		schema.addColumn("age",Type.INT4);
-		schema.addColumn("name",Type.TEXT);
-
-		TableMeta meta = CatalogUtil.newTableMeta("CSV");
-		
-		VTuple[] tuples = new VTuple[4];
-		for(int i=0; i < tuples.length; i++) {
-		  tuples[i] = new VTuple(new Datum[] {
-          DatumFactory.createInt4(i),
-		      DatumFactory.createInt4(i + 32),
-		      DatumFactory.createText("name" + i)});
-		}
-
-    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
-    fs.mkdirs(path.getParent());
-    FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
-    assertEquals(fs.getUri(), fileStorageManager.getFileSystem().getUri());
-
-		Appender appender = fileStorageManager.getAppender(meta, schema, path);
-    appender.init();
-		for(Tuple t : tuples) {
-		  appender.addTuple(t);
-		}
-		appender.close();
-
-		Scanner scanner = fileStorageManager.getFileScanner(meta, schema, path);
-    scanner.init();
-		int i=0;
-		while(scanner.next() != null) {
-			i++;
-		}
-		assertEquals(4,i);
-	}
-
-  @Test
-  public void testGetSplit() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
-    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
-    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
-
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(1).build();
-    cluster.waitClusterUp();
-    TajoConf tajoConf = new TajoConf(conf);
-    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
-
-    int testCount = 10;
-    Path tablePath = new Path("/testGetSplit");
-    try {
-      DistributedFileSystem fs = cluster.getFileSystem();
-
-      // Create test partitions
-      List<Path> partitions = Lists.newArrayList();
-      for (int i =0; i < testCount; i++){
-        Path tmpFile = new Path(tablePath, String.valueOf(i));
-        DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl);
-        partitions.add(tmpFile);
-      }
-
-      assertTrue(fs.exists(tablePath));
-      FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf);
-      assertEquals(fs.getUri(), sm.getFileSystem().getUri());
-
-      Schema schema = new Schema();
-      schema.addColumn("id", Type.INT4);
-      schema.addColumn("age",Type.INT4);
-      schema.addColumn("name",Type.TEXT);
-      TableMeta meta = CatalogUtil.newTableMeta("CSV");
-
-      List<Fragment> splits = Lists.newArrayList();
-      // Get FileFragments in partition batch
-      splits.addAll(sm.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
-      assertEquals(testCount, splits.size());
-      // -1 is unknown volumeId
-      assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
-
-      splits.clear();
-      splits.addAll(sm.getSplits("data", meta, schema,
-          partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
-      assertEquals(testCount / 2, splits.size());
-      assertEquals(1, splits.get(0).getHosts().length);
-      assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
-      fs.close();
-    } finally {
-      cluster.shutdown(true);
-    }
-  }
-
-  @Test
-  public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
-    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
-    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
-    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
-
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
-        .numDataNodes(2).build();
-    cluster.waitClusterUp();
-
-    TajoConf tajoConf = new TajoConf(conf);
-    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
-
-    int testCount = 10;
-    Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
-    try {
-      DistributedFileSystem fs = cluster.getFileSystem();
-
-      // Create test files
-      for (int i = 0; i < testCount; i++) {
-        Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat");
-        DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
-      }
-      assertTrue(fs.exists(tablePath));
-      FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(tajoConf);
-      assertEquals(fs.getUri(), sm.getFileSystem().getUri());
-
-      Schema schema = new Schema();
-      schema.addColumn("id", Type.INT4);
-      schema.addColumn("age", Type.INT4);
-      schema.addColumn("name", Type.TEXT);
-      TableMeta meta = CatalogUtil.newTableMeta("CSV");
-
-      List<Fragment> splits = Lists.newArrayList();
-      splits.addAll(sm.getSplits("data", meta, schema, tablePath));
-
-      assertEquals(testCount, splits.size());
-      assertEquals(2, splits.get(0).getHosts().length);
-      assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
-      assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
-      fs.close();
-    } finally {
-      cluster.shutdown(true);
-    }
-  }
-
-  @Test
-  public void testStoreType() throws Exception {
-    final Configuration hdfsConf = new HdfsConfiguration();
-    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
-    hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
-    hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
-    hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
-
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf)
-        .numDataNodes(2).build();
-    cluster.waitClusterUp();
-
-    TajoConf tajoConf = new TajoConf(hdfsConf);
-    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
-
-    try {
-      /* Local FileSystem */
-      FileTablespace sm = (FileTablespace) TableSpaceManager.getStorageManager(conf, "CSV");
-      assertEquals(fs.getUri(), sm.getFileSystem().getUri());
-
-      /* Distributed FileSystem */
-      sm = (FileTablespace) TableSpaceManager.getStorageManager(tajoConf, "CSV");
-      assertNotEquals(fs.getUri(), sm.getFileSystem().getUri());
-      assertEquals(cluster.getFileSystem().getUri(), sm.getFileSystem().getUri());
-    } finally {
-      cluster.shutdown(true);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index a6c238b..9237e07 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -57,7 +57,7 @@ public class TestFileSystems {
   public TestFileSystems(FileSystem fs) throws IOException {
     this.fs = fs;
     this.conf = new TajoConf(fs.getConf());
-    sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
+    sm = TableSpaceManager.getLocalFs();
     testDir = getTestDir(this.fs, TEST_PATH);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
new file mode 100644
index 0000000..ec3e143
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.*;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+public class TestFileTablespace {
+	private TajoConf conf;
+	private static String TEST_PATH = "target/test-data/TestFileTablespace";
+  private Path testDir;
+  private FileSystem localFs;
+
+	@Before
+	public void setUp() throws Exception {
+		conf = new TajoConf();
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    localFs = testDir.getFileSystem(conf);
+	}
+
+	@After
+	public void tearDown() throws Exception {
+	}
+
+  @Test
+	public final void testGetScannerAndAppender() throws IOException {
+		Schema schema = new Schema();
+		schema.addColumn("id", Type.INT4);
+		schema.addColumn("age",Type.INT4);
+		schema.addColumn("name",Type.TEXT);
+
+		TableMeta meta = CatalogUtil.newTableMeta("CSV");
+		
+		VTuple[] tuples = new VTuple[4];
+		for(int i=0; i < tuples.length; i++) {
+		  tuples[i] = new VTuple(new Datum[] {
+          DatumFactory.createInt4(i),
+		      DatumFactory.createInt4(i + 32),
+		      DatumFactory.createText("name" + i)});
+		}
+
+    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
+    localFs.mkdirs(path.getParent());
+    FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getLocalFs();
+    assertEquals(localFs.getUri(), fileStorageManager.getFileSystem().getUri());
+
+		Appender appender = fileStorageManager.getAppender(meta, schema, path);
+    appender.init();
+		for(Tuple t : tuples) {
+		  appender.addTuple(t);
+		}
+		appender.close();
+
+		Scanner scanner = fileStorageManager.getFileScanner(meta, schema, path);
+    scanner.init();
+		int i=0;
+		while(scanner.next() != null) {
+			i++;
+		}
+		assertEquals(4,i);
+	}
+
+  @Test
+  public void testGetSplit() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, false);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    cluster.waitClusterUp();
+    TajoConf tajoConf = new TajoConf(conf);
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
+
+    int testCount = 10;
+    Path tablePath = new Path("/testGetSplit");
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      // Create test partitions
+      List<Path> partitions = Lists.newArrayList();
+      for (int i =0; i < testCount; i++){
+        Path tmpFile = new Path(tablePath, String.valueOf(i));
+        DFSTestUtil.createFile(fs, new Path(tmpFile, "tmpfile.dat"), 10, (short) 2, 0xDEADDEADl);
+        partitions.add(tmpFile);
+      }
+
+      assertTrue(fs.exists(tablePath));
+      FileTablespace space = new FileTablespace("testGetSplit", fs.getUri());
+      space.init(new TajoConf(conf));
+      assertEquals(fs.getUri(), space.getUri());
+
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age",Type.INT4);
+      schema.addColumn("name",Type.TEXT);
+      TableMeta meta = CatalogUtil.newTableMeta("CSV");
+
+      List<Fragment> splits = Lists.newArrayList();
+      // Get FileFragments in partition batch
+      splits.addAll(space.getSplits("data", meta, schema, partitions.toArray(new Path[partitions.size()])));
+      assertEquals(testCount, splits.size());
+      // -1 is unknown volumeId
+      assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+
+      splits.clear();
+      splits.addAll(space.getSplits("data", meta, schema,
+          partitions.subList(0, partitions.size() / 2).toArray(new Path[partitions.size() / 2])));
+      assertEquals(testCount / 2, splits.size());
+      assertEquals(1, splits.get(0).getHosts().length);
+      assertEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+      fs.close();
+    } finally {
+      cluster.shutdown(true);
+    }
+  }
+
+  @Test
+  public void testGetSplitWithBlockStorageLocationsBatching() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    conf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(2).build();
+    cluster.waitClusterUp();
+
+    TajoConf tajoConf = new TajoConf(conf);
+    tajoConf.setVar(TajoConf.ConfVars.ROOT_DIR, cluster.getFileSystem().getUri() + "/tajo");
+
+    int testCount = 10;
+    Path tablePath = new Path("/testGetSplitWithBlockStorageLocationsBatching");
+    try {
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      // Create test files
+      for (int i = 0; i < testCount; i++) {
+        Path tmpFile = new Path(tablePath, "tmpfile" + i + ".dat");
+        DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
+      }
+      assertTrue(fs.exists(tablePath));
+
+      FileTablespace sm = new FileTablespace("testGetSplitWithBlockStorageLocationsBatching", fs.getUri());
+      sm.init(new TajoConf(conf));
+
+      assertEquals(fs.getUri(), sm.getUri());
+
+      Schema schema = new Schema();
+      schema.addColumn("id", Type.INT4);
+      schema.addColumn("age", Type.INT4);
+      schema.addColumn("name", Type.TEXT);
+      TableMeta meta = CatalogUtil.newTableMeta("CSV");
+
+      List<Fragment> splits = Lists.newArrayList();
+      splits.addAll(sm.getSplits("data", meta, schema, tablePath));
+
+      assertEquals(testCount, splits.size());
+      assertEquals(2, splits.get(0).getHosts().length);
+      assertEquals(2, ((FileFragment)splits.get(0)).getDiskIds().length);
+      assertNotEquals(-1, ((FileFragment)splits.get(0)).getDiskIds()[0]);
+      fs.close();
+    } finally {
+      cluster.shutdown(true);
+    }
+  }
+
+  @Test
+  public void testGetFileTablespace() throws Exception {
+    final Configuration hdfsConf = new HdfsConfiguration();
+    String testDataPath = TEST_PATH + "/" + UUID.randomUUID().toString();
+    hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, testDataPath);
+    hdfsConf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
+    hdfsConf.setBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, true);
+
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(hdfsConf).numDataNodes(2).build();
+    cluster.waitClusterUp();
+    URI uri = URI.create(cluster.getFileSystem().getUri() + "/tajo");
+
+    Optional<Tablespace> existingTs = Optional.absent();
+    try {
+      /* Local FileSystem */
+      FileTablespace space = TableSpaceManager.getLocalFs();
+      assertEquals(localFs.getUri(), space.getFileSystem().getUri());
+
+      FileTablespace distTablespace = new FileTablespace("testGetFileTablespace", uri);
+      distTablespace.init(conf);
+      existingTs = TableSpaceManager.addTableSpaceForTest(distTablespace);
+
+      /* Distributed FileSystem */
+      space = (FileTablespace) TableSpaceManager.get(uri).get();
+      assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
+
+      space = (FileTablespace) TableSpaceManager.getByName("testGetFileTablespace").get();
+      assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
+
+    } finally {
+
+      if (existingTs.isPresent()) {
+        TableSpaceManager.addTableSpaceForTest(existingTs.get());
+      }
+
+      cluster.shutdown(true);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
index 266f906..c13ce16 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -65,7 +65,7 @@ public class TestLineReader {
 
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
     Path tablePath = new Path(testDir, "line.data");
-    FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
+    FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -118,7 +118,7 @@ public class TestLineReader {
     meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
 
     Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName());
-    FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
+    FileAppender appender = (FileAppender) (TableSpaceManager.getLocalFs()).getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -176,7 +176,7 @@ public class TestLineReader {
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
 
     Path tablePath = new Path(testDir, "testLineDelimitedReader");
-    FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
+    FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -279,7 +279,7 @@ public class TestLineReader {
 
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
     Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data");
-    FileAppender appender = (FileAppender) TableSpaceManager.getFileStorageManager(conf).getAppender(
+    FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 03a601d..79928ff 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -94,7 +94,7 @@ public class TestMergeScanner {
     conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = TableSpaceManager.getFileStorageManager(conf);
+    sm = TableSpaceManager.getLocalFs();
   }
 
   @Test
@@ -114,7 +114,7 @@ public class TestMergeScanner {
     }
 
     Path table1Path = new Path(testDir, storeType + "_1.data");
-    Appender appender1 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table1Path);
+    Appender appender1 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path);
     appender1.enableStats();
     appender1.init();
     int tupleNum = 10000;
@@ -136,7 +136,7 @@ public class TestMergeScanner {
     }
 
     Path table2Path = new Path(testDir, storeType + "_2.data");
-    Appender appender2 = TableSpaceManager.getFileStorageManager(conf).getAppender(null, null, meta, schema, table2Path);
+    Appender appender2 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path);
     appender2.enableStats();
     appender2.init();