You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/25 10:09:02 UTC

[10/15] tajo git commit: TAJO-1616: Implement TablespaceManager to load Tablespaces. (missed commits)

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
new file mode 100644
index 0000000..26af769
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import net.minidev.json.JSONObject;
+import net.minidev.json.parser.JSONParser;
+import net.minidev.json.parser.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.Pair;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.tajo.storage.StorageConstants.LOCAL_FS_URI;
+
+/**
+ * It handles available table spaces and cache TableSpace instances.
+ *
+ * Default tablespace must be a filesystem-based one.
+ * HDFS and S3 can be a default tablespace if a Tajo cluster is in fully distributed mode.
+ * Local file system can be a default tablespace if a Tajo cluster runs on a single machine.
+ */
+public class TablespaceManager implements StorageService {
+  private static final Log LOG = LogFactory.getLog(TablespaceManager.class);
+
+  public static final String DEFAULT_CONFIG_FILE = "storage-default.json";
+  public static final String SITE_CONFIG_FILE = "storage-site.json";
+
+  /** default tablespace name */
+  public static final String DEFAULT_TABLESPACE_NAME = "default";
+
+  private final static TajoConf systemConf = new TajoConf();
+  private final static JSONParser parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR);
+
+  // The relation ship among name, URI, Tablespaces must be kept 1:1:1.
+  protected static final Map<String, URI> SPACES_URIS_MAP = Maps.newHashMap();
+  protected static final TreeMap<URI, Tablespace> TABLE_SPACES = Maps.newTreeMap();
+
+  protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap();
+  protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap();
+
+  public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class};
+
+  static {
+    instance = new TablespaceManager();
+  }
+  /**
+   * Singleton instance
+   */
+  private static final TablespaceManager instance;
+
+  private TablespaceManager() {
+    initForDefaultConfig(); // loading storage-default.json
+    initSiteConfig();       // storage-site.json will override the configs of storage-default.json
+    addWarehouseAsSpace();  // adding a warehouse directory for a default tablespace
+    addLocalFsTablespace(); // adding a tablespace using local file system by default
+  }
+
+  private void addWarehouseAsSpace() {
+    Path warehouseDir = TajoConf.getWarehouseDir(systemConf);
+    registerTableSpace(DEFAULT_TABLESPACE_NAME, warehouseDir.toUri(), null, true, false);
+  }
+
+  private void addLocalFsTablespace() {
+    if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) {
+      String tmpName = UUID.randomUUID().toString();
+      registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false);
+    }
+  }
+
+  public static TablespaceManager getInstance() {
+    return instance;
+  }
+
+  private void initForDefaultConfig() {
+    JSONObject json = loadFromConfig(DEFAULT_CONFIG_FILE);
+    if (json == null) {
+      throw new IllegalStateException("There is no " + SITE_CONFIG_FILE);
+    }
+    applyConfig(json, false);
+  }
+
+  private void initSiteConfig() {
+    JSONObject json = loadFromConfig(SITE_CONFIG_FILE);
+
+    // if there is no storage-site.json file, nothing happen.
+    if (json != null) {
+      applyConfig(json, true);
+    }
+  }
+
+  private JSONObject loadFromConfig(String fileName) {
+    String json;
+    try {
+      json = FileUtil.readTextFileFromResource(fileName);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (json != null) {
+      return parseJson(json);
+    } else {
+      return null;
+    }
+  }
+
+  private static JSONObject parseJson(String json) {
+    try {
+      return (JSONObject) parser.parse(json);
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void applyConfig(JSONObject json, boolean override) {
+    loadStorages(json);
+    loadTableSpaces(json, override);
+  }
+
+  private void loadStorages(JSONObject json) {
+    JSONObject spaces = (JSONObject) json.get(KEY_STORAGES);
+
+    if (spaces != null) {
+      Pair<String, Class<? extends Tablespace>> pair = null;
+      for (Map.Entry<String, Object> entry : spaces.entrySet()) {
+
+        try {
+          pair = extractStorage(entry);
+        } catch (ClassNotFoundException e) {
+          LOG.warn(e);
+          continue;
+        }
+
+        TABLE_SPACE_HANDLERS.put(pair.getFirst(), pair.getSecond());
+      }
+    }
+  }
+
+  private Pair<String, Class<? extends Tablespace>> extractStorage(Map.Entry<String, Object> entry)
+      throws ClassNotFoundException {
+
+    String storageType = entry.getKey();
+    JSONObject storageDesc = (JSONObject) entry.getValue();
+    String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER);
+
+    return new Pair<String, Class<? extends Tablespace>>(
+        storageType,(Class<? extends Tablespace>) Class.forName(handlerClass));
+  }
+
+  private void loadTableSpaces(JSONObject json, boolean override) {
+    JSONObject spaces = (JSONObject) json.get(KEY_SPACES);
+
+    if (spaces != null) {
+      for (Map.Entry<String, Object> entry : spaces.entrySet()) {
+        AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override);
+      }
+    }
+  }
+
+  public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) {
+    boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default"));
+    URI spaceUri = URI.create(spaceDesc.getAsString("uri"));
+
+    if (defaultSpace) {
+      registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override);
+    }
+    registerTableSpace(spaceName, spaceUri, spaceDesc, true, override);
+  }
+
+  private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc,
+                                         boolean visible, boolean override) {
+    Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible);
+    tableSpace.setVisible(visible);
+
+    try {
+      tableSpace.init(systemConf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    putTablespace(tableSpace, override);
+
+    // If the arbitrary path is allowed, root uri is also added as a tablespace
+    if (tableSpace.getProperty().isArbitraryPathAllowed()) {
+      URI rootUri = tableSpace.getRootUri();
+      // if there already exists or the rootUri is 'file:/', it won't overwrite the tablespace.
+      if (!TABLE_SPACES.containsKey(rootUri) && !rootUri.toString().startsWith(LOCAL_FS_URI.toString())) {
+        String tmpName = UUID.randomUUID().toString();
+        registerTableSpace(tmpName, rootUri, spaceDesc, false, override);
+      }
+    }
+  }
+
+  private static void putTablespace(Tablespace space, boolean override) {
+    // It is a device to keep the relationship among name, URI, and tablespace 1:1:1.
+
+    boolean nameExist = SPACES_URIS_MAP.containsKey(space.getName());
+    boolean uriExist = TABLE_SPACES.containsKey(space.uri);
+
+    boolean mismatch = nameExist && !SPACES_URIS_MAP.get(space.getName()).equals(space.getUri());
+    mismatch = mismatch || uriExist && TABLE_SPACES.get(space.uri).equals(space);
+
+    if (!override && mismatch) {
+      throw new RuntimeException("Name or URI of Tablespace must be unique.");
+    }
+
+    SPACES_URIS_MAP.put(space.getName(), space.getUri());
+    // We must guarantee that the same uri results in the same tablespace instance.
+    TABLE_SPACES.put(space.getUri(), space);
+  }
+
+  /**
+   * Return length of the fragment.
+   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
+   *
+   * @param conf Tajo system property
+   * @param fragment Fragment
+   * @return
+   */
+  public static long guessFragmentVolume(TajoConf conf, Fragment fragment) {
+    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
+      return conf.getLongVar(TajoConf.ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
+    } else {
+      return fragment.getLength();
+    }
+  }
+
+  public static final String KEY_STORAGES = "storages"; // storages
+  public static final String KEY_STORAGE_HANDLER = "handler"; // storages/?/handler
+  public static final String KEY_STORAGE_DEFAULT_FORMAT = "default-format"; // storages/?/default-format
+
+  public static final String KEY_SPACES = "spaces";
+
+  private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) {
+    Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri);
+    Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme());
+
+    if (clazz == null) {
+      throw new RuntimeException("There is no tablespace for " + uri.toString());
+    }
+
+    try {
+      Constructor<? extends Tablespace> constructor =
+          (Constructor<? extends Tablespace>) CONSTRUCTORS.get(clazz);
+
+      if (constructor == null) {
+        constructor = clazz.getDeclaredConstructor(TABLESPACE_PARAM);
+        constructor.setAccessible(true);
+        CONSTRUCTORS.put(clazz, constructor);
+      }
+
+      return constructor.newInstance(new Object[]{spaceName, uri});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) {
+    Tablespace existing;
+    synchronized (SPACES_URIS_MAP) {
+      // Remove existing one
+      SPACES_URIS_MAP.remove(space.getName());
+      existing = TABLE_SPACES.remove(space.getUri());
+
+      // Add anotherone for test
+      registerTableSpace(space.name, space.uri, null, true, true);
+    }
+    // if there is an existing one, return it.
+    return Optional.fromNullable(existing);
+  }
+
+  public Iterable<String> getSupportSchemes() {
+    return TABLE_SPACE_HANDLERS.keySet();
+  }
+
+  /**
+   * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
+   *
+   * @param uri Table or Table Fragment URI.
+   * @param <T> Tablespace class type
+   * @return Tablespace. If uri is null, the default tablespace will be returned.
+   */
+  public static <T extends Tablespace> Optional<T> get(@Nullable String uri) {
+
+    if (uri == null || uri.isEmpty()) {
+      return (Optional<T>) Optional.of(getDefault());
+    }
+
+    Tablespace lastOne = null;
+
+    // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and
+    // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific.
+    for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) {
+      if (uri.startsWith(entry.getKey().toString())) {
+        lastOne = entry.getValue();
+      }
+    }
+    return (Optional<T>) Optional.fromNullable(lastOne);
+  }
+
+  /**
+   * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
+   *
+   * @param uri Table or Table Fragment URI.
+   * @param <T> Tablespace class type
+   * @return Tablespace. If uri is null, the default tablespace will be returned.
+   */
+  public static <T extends Tablespace> Optional<T> get(@Nullable URI uri) {
+    if (uri == null) {
+      return (Optional<T>) Optional.of(getDefault());
+    } else {
+      return (Optional<T>) get(uri.toString());
+    }
+  }
+
+  /**
+   * It returns the default tablespace. This method ensures that it always return the tablespace.
+   *
+   * @return
+   */
+  public static <T extends Tablespace> T getDefault() {
+    return (T) getByName(DEFAULT_TABLESPACE_NAME).get();
+  }
+
+  public static <T extends Tablespace> T getLocalFs() {
+    return (T) get(LOCAL_FS_URI).get();
+  }
+
+  public static Optional<? extends Tablespace> getByName(String name) {
+    URI uri = SPACES_URIS_MAP.get(name);
+    if (uri != null) {
+      return Optional.of(TABLE_SPACES.get(uri));
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  public static Optional<? extends Tablespace> getAnyByScheme(String scheme) {
+    for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.entrySet()) {
+      String uriScheme = entry.getKey().getScheme();
+      if (uriScheme != null && uriScheme.equalsIgnoreCase(scheme)) {
+        return Optional.of(entry.getValue());
+      }
+    }
+
+    return Optional.absent();
+  }
+
+  @Override
+  public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) {
+    Tablespace space = spaceName == null ? getDefault() : getByName(spaceName).get();
+    return space.getTableUri(databaseName, tableName);
+  }
+
+  public static Iterable<Tablespace> getAllTablespaces() {
+    return TABLE_SPACES.values();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 916aae7..7943134 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
@@ -47,7 +47,7 @@ public class HBasePutAppender extends AbstractHBaseAppender {
   public void init() throws IOException {
     super.init();
 
-    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(uri).get();
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.get(uri).get();
     HConnection hconn = space.getConnection();
     htable = hconn.getTable(columnMapping.getHbaseTableName());
     htable.setAutoFlushTo(false);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index 16f4c14..7369897 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -133,7 +133,7 @@ public class HBaseScanner implements Scanner {
     rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
     rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
 
-    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get();
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.get(fragment.getUri()).get();
     hbaseConf = space.getHbaseConf();
     initScanner();
   }
@@ -181,7 +181,7 @@ public class HBaseScanner implements Scanner {
     }
 
     if (htable == null) {
-      HConnection hconn = ((HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get()).getConnection();
+      HConnection hconn = ((HBaseTablespace) TablespaceManager.get(fragment.getUri()).get()).getConnection();
       htable = hconn.getTable(fragment.getHbaseTableName());
     }
     scanner = htable.getScanner(scan);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 5fac0cf..18bb7ed 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -51,10 +51,7 @@ import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.BytesUtils;
-import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.*;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -68,9 +65,9 @@ import java.util.*;
 public class HBaseTablespace extends Tablespace {
   private static final Log LOG = LogFactory.getLog(HBaseTablespace.class);
 
-  public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty(false, true, true, false);
-
-  public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true);
+  public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty("hbase", false, true, false);
+  public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true);
+  public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false);
 
   private Configuration hbaseConf;
 
@@ -572,6 +569,15 @@ public class HBaseTablespace extends Tablespace {
   }
 
   @Override
+  public Appender getAppenderForInsertRow(OverridableConf queryContext,
+                                          TaskAttemptId taskAttemptId,
+                                          TableMeta meta,
+                                          Schema schema,
+                                          Path workDir) throws IOException {
+    return new HBasePutAppender(conf, uri, taskAttemptId, schema, meta, workDir);
+  }
+
+  @Override
   public Appender getAppender(OverridableConf queryContext,
                               TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
       throws IOException {
@@ -1096,8 +1102,14 @@ public class HBaseTablespace extends Tablespace {
   }
 
   @Override
-  public FormatProperty getFormatProperty(String format) {
-    return HFILE_FORMAT_PROPERTIES;
+  public FormatProperty getFormatProperty(TableMeta meta) {
+    KeyValueSet tableProperty = meta.getOptions();
+    if (tableProperty.isTrue(HBaseStorageConstants.INSERT_PUT_MODE) ||
+        tableProperty.isTrue(StorageConstants.INSERT_DIRECTLY)) {
+      return PUT_MODE_PROPERTIES;
+    } else {
+      return HFILE_FORMAT_PROPERTIES;
+    }
   }
 
   public void prepareTable(LogicalNode node) throws IOException {
@@ -1134,6 +1146,24 @@ public class HBaseTablespace extends Tablespace {
   }
 
   @Override
+  public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
+    if (meta.getOptions().isTrue(HBaseStorageConstants.INSERT_PUT_MODE)) {
+      throw new IOException("Staging phase is not supported in this storage.");
+    } else {
+      return TablespaceManager.getDefault().getStagingUri(context, queryId, meta);
+    }
+  }
+
+  public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context,
+                                 TableMeta meta) throws IOException {
+    if (!meta.getOptions().isTrue(HBaseStorageConstants.INSERT_PUT_MODE)) {
+      return TablespaceManager.getDefault().prepareStagingSpace(conf, queryId, context, meta);
+    } else {
+      throw new IOException("Staging phase is not supported in this storage.");
+    }
+  }
+
+  @Override
   public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException  {
     if (tableDesc != null) {
       Schema tableSchema = tableDesc.getSchema();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
index f7cbb5a..f0c8f15 100644
--- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
@@ -25,7 +25,7 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.Pair;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -45,7 +45,7 @@ public class TestHBaseTableSpace {
     String tableSpaceUri = "hbase:zk://host1:2171";
     HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri));
     hBaseTablespace.init(new TajoConf());
-    TableSpaceManager.addTableSpaceForTest(hBaseTablespace);
+    TablespaceManager.addTableSpaceForTest(hBaseTablespace);
   }
 
   @Test
@@ -58,8 +58,8 @@ public class TestHBaseTableSpace {
 
   @Test
   public void testTablespaceHandler() throws Exception {
-    assertTrue((TableSpaceManager.getByName("cluster1").get()) instanceof HBaseTablespace);
-    assertTrue((TableSpaceManager.get(URI.create("hbase:zk://host1:2171")).get())
+    assertTrue((TablespaceManager.getByName("cluster1").get()) instanceof HBaseTablespace);
+    assertTrue((TablespaceManager.get(URI.create("hbase:zk://host1:2171")).get())
         instanceof HBaseTablespace);
   }
 
@@ -73,7 +73,7 @@ public class TestHBaseTableSpace {
     EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
     scanNode.setQual(evalNodeA);
 
-    HBaseTablespace storageManager = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    HBaseTablespace storageManager = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
     List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
     assertNotNull(indexEvals);
     assertEquals(1, indexEvals.size());

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index 081fa3f..efe2bfd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -58,7 +58,7 @@ public abstract class FileAppender implements Appender {
         throw new IllegalArgumentException("Configuration must be an instance of TajoConf");
       }
 
-      Optional<FileTablespace> spaceResult = TableSpaceManager.get(workDir.toUri());
+      Optional<FileTablespace> spaceResult = TablespaceManager.get(workDir.toUri());
 
       if (!spaceResult.isPresent()) {
         throw new IllegalStateException("No TableSpace for " + workDir.toUri());

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 2ce1f09..3b63012 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -19,14 +19,17 @@
 package org.apache.tajo.storage;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.*;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -92,8 +95,12 @@ public class FileTablespace extends Tablespace {
         }
       };
 
+  private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true);
+  private static final FormatProperty GeneralFileProperties = new FormatProperty(true, false, true);
+
   protected FileSystem fs;
-  protected Path basePath;
+  protected Path spacePath;
+  protected Path stagingRootPath;
   protected boolean blocksMetadataEnabled;
   private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
 
@@ -103,8 +110,9 @@ public class FileTablespace extends Tablespace {
 
   @Override
   protected void storageInit() throws IOException {
-    this.basePath = new Path(uri);
-    this.fs = basePath.getFileSystem(conf);
+    this.spacePath = new Path(uri);
+    this.fs = spacePath.getFileSystem(conf);
+    this.stagingRootPath = fs.makeQualified(new Path(conf.getVar(TajoConf.ConfVars.STAGING_ROOT_DIR)));
     this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString());
 
     this.blocksMetadataEnabled =
@@ -167,7 +175,7 @@ public class FileTablespace extends Tablespace {
 
   @Override
   public URI getTableUri(String databaseName, String tableName) {
-    return StorageUtil.concatPath(basePath, databaseName, tableName).toUri();
+    return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri();
   }
 
   private String partitionPath = "";
@@ -192,12 +200,12 @@ public class FileTablespace extends Tablespace {
   }
 
   public FileFragment[] split(String tableName) throws IOException {
-    Path tablePath = new Path(basePath, tableName);
+    Path tablePath = new Path(spacePath, tableName);
     return split(tableName, tablePath, fs.getDefaultBlockSize());
   }
 
   public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
-    Path tablePath = new Path(basePath, tableName);
+    Path tablePath = new Path(spacePath, tableName);
     return split(tableName, tablePath, fragmentSize);
   }
 
@@ -491,30 +499,6 @@ public class FileTablespace extends Tablespace {
   }
 
   /**
-   * Generate the map of host and make them into Volume Ids.
-   *
-   */
-  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
-    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
-    for (FileFragment frag : frags) {
-      String[] hosts = frag.getHosts();
-      int[] diskIds = frag.getDiskIds();
-      for (int i = 0; i < hosts.length; i++) {
-        Set<Integer> volumeList = volumeMap.get(hosts[i]);
-        if (volumeList == null) {
-          volumeList = new HashSet<Integer>();
-          volumeMap.put(hosts[i], volumeList);
-        }
-
-        if (diskIds.length > 0 && diskIds[i] > -1) {
-          volumeList.add(diskIds[i]);
-        }
-      }
-    }
-
-    return volumeMap;
-  }
-  /**
    * Generate the list of files and make them into FileSplits.
    *
    * @throws IOException
@@ -674,7 +658,7 @@ public class FileTablespace extends Tablespace {
       String simpleTableName = splitted[1];
 
       // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
-      Path tablePath = StorageUtil.concatPath(basePath, databaseName, simpleTableName);
+      Path tablePath = StorageUtil.concatPath(spacePath, databaseName, simpleTableName);
       tableDesc.setUri(tablePath.toUri());
     } else {
       Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given.");
@@ -851,22 +835,14 @@ public class FileTablespace extends Tablespace {
     }
   }
 
-  private static final StorageProperty FileStorageProperties = new StorageProperty(true, true, true, true);
-  private static final FormatProperty GeneralFileProperties = new FormatProperty(false);
-  private static final FormatProperty HFileProperties = new FormatProperty(true);
-
   @Override
   public StorageProperty getProperty() {
     return FileStorageProperties;
   }
 
   @Override
-  public FormatProperty getFormatProperty(String format) {
-    if (format.equalsIgnoreCase("hbase")) {
-      return HFileProperties;
-    } else {
-      return GeneralFileProperties;
-    }
+  public FormatProperty getFormatProperty(TableMeta meta) {
+    return GeneralFileProperties;
   }
 
   @Override
@@ -882,6 +858,84 @@ public class FileTablespace extends Tablespace {
   }
 
   @Override
+  public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
+    String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, "");
+
+    Path stagingDir;
+    // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO
+    // So, this query results won't be materialized as a part of a table.
+    // The result will be temporarily written in the staging directory.
+    if (outputPath.isEmpty()) {
+      // for temporarily written in the storage directory
+      stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId));
+    } else {
+      Optional<Tablespace> spaceResult = TablespaceManager.get(outputPath);
+      if (!spaceResult.isPresent()) {
+        throw new IOException("No registered Tablespace for " + outputPath);
+      }
+
+      Tablespace space = spaceResult.get();
+      if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation
+        // If this space allows move operation, the staging directory will be underneath the final output table uri.
+        stagingDir = fs.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, queryId));
+      } else {
+        stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId));
+      }
+    }
+
+    return stagingDir.toUri();
+  }
+
+  // query submission directory is private!
+  final public static FsPermission STAGING_DIR_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx--------
+  public static final String TMP_STAGING_DIR_PREFIX = ".staging";
+
+  public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, TableMeta meta)
+      throws IOException {
+
+    String realUser;
+    String currentUser;
+    UserGroupInformation ugi;
+    ugi = UserGroupInformation.getLoginUser();
+    realUser = ugi.getShortUserName();
+    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+
+    Path stagingDir = new Path(getStagingUri(context, queryId, meta));
+
+    ////////////////////////////////////////////
+    // Create Output Directory
+    ////////////////////////////////////////////
+
+    if (fs.exists(stagingDir)) {
+      throw new IOException("The staging directory '" + stagingDir + "' already exists");
+    }
+    fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    FileStatus fsStatus = fs.getFileStatus(stagingDir);
+    String owner = fsStatus.getOwner();
+
+    if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
+      throw new IOException("The ownership on the user's query " +
+          "directory " + stagingDir + " is not as expected. " +
+          "It is owned by " + owner + ". The directory must " +
+          "be owned by the submitter " + currentUser + " or " +
+          "by " + realUser);
+    }
+
+    if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+      LOG.info("Permissions on staging directory " + stagingDir + " are " +
+          "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+          "to correct value " + STAGING_DIR_PERMISSION);
+      fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    }
+
+    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    fs.mkdirs(stagingResultDir);
+
+    return stagingDir.toUri();
+  }
+
+  @Override
   public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException {
   }
 
@@ -1257,4 +1311,6 @@ public class FileTablespace extends Tablespace {
 
     return retValue;
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index bd5502d..1d32291 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -85,7 +85,7 @@ public class HashShuffleAppenderManager {
           fs.mkdirs(dataFile.getParent());
         }
 
-        FileTablespace space = (FileTablespace) TableSpaceManager.get(dataFile.toUri()).get();
+        FileTablespace space = (FileTablespace) TablespaceManager.get(dataFile.toUri()).get();
         FileAppender appender = (FileAppender) space.getAppender(meta, outSchema, dataFile);
         appender.enableStats();
         appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index ab63d55..f50a20d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -122,7 +122,7 @@ public class TestCompressionStorages {
 
     String fileName = "Compression_" + codec.getSimpleName();
     Path tablePath = new Path(testDir, fileName);
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.enableStats();
 
     appender.init();
@@ -154,7 +154,7 @@ public class TestCompressionStorages {
     FileFragment[] tablets = new FileFragment[1];
     tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
 
-    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema);
 
     if (storeType.equalsIgnoreCase("CSV")) {
       if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
index 2d919cd..ca5885c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -103,7 +103,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
     FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple tuple;
@@ -125,7 +125,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
     FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     assertNotNull(scanner.next());
@@ -147,7 +147,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
     FileFragment fragment =  getFileFragment("testErrorTolerance2.json");
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     try {
@@ -166,7 +166,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
     FileFragment fragment = getFileFragment("testErrorTolerance3.json");
-    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index 9237e07..1119968 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -57,7 +57,7 @@ public class TestFileSystems {
   public TestFileSystems(FileSystem fs) throws IOException {
     this.fs = fs;
     this.conf = new TajoConf(fs.getConf());
-    sm = TableSpaceManager.getLocalFs();
+    sm = TablespaceManager.getLocalFs();
     testDir = getTestDir(this.fs, TEST_PATH);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index ec3e143..09b91ea 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -81,7 +81,7 @@ public class TestFileTablespace {
 
     Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
     localFs.mkdirs(path.getParent());
-    FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getLocalFs();
+    FileTablespace fileStorageManager = (FileTablespace) TablespaceManager.getLocalFs();
     assertEquals(localFs.getUri(), fileStorageManager.getFileSystem().getUri());
 
 		Appender appender = fileStorageManager.getAppender(meta, schema, path);
@@ -224,24 +224,24 @@ public class TestFileTablespace {
     Optional<Tablespace> existingTs = Optional.absent();
     try {
       /* Local FileSystem */
-      FileTablespace space = TableSpaceManager.getLocalFs();
+      FileTablespace space = TablespaceManager.getLocalFs();
       assertEquals(localFs.getUri(), space.getFileSystem().getUri());
 
       FileTablespace distTablespace = new FileTablespace("testGetFileTablespace", uri);
       distTablespace.init(conf);
-      existingTs = TableSpaceManager.addTableSpaceForTest(distTablespace);
+      existingTs = TablespaceManager.addTableSpaceForTest(distTablespace);
 
       /* Distributed FileSystem */
-      space = (FileTablespace) TableSpaceManager.get(uri).get();
+      space = (FileTablespace) TablespaceManager.get(uri).get();
       assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
 
-      space = (FileTablespace) TableSpaceManager.getByName("testGetFileTablespace").get();
+      space = (FileTablespace) TablespaceManager.getByName("testGetFileTablespace").get();
       assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
 
     } finally {
 
       if (existingTs.isPresent()) {
-        TableSpaceManager.addTableSpaceForTest(existingTs.get());
+        TablespaceManager.addTableSpaceForTest(existingTs.get());
       }
 
       cluster.shutdown(true);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
index c13ce16..7410778 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -65,7 +65,7 @@ public class TestLineReader {
 
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
     Path tablePath = new Path(testDir, "line.data");
-    FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
+    FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -118,7 +118,7 @@ public class TestLineReader {
     meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
 
     Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName());
-    FileAppender appender = (FileAppender) (TableSpaceManager.getLocalFs()).getAppender(
+    FileAppender appender = (FileAppender) (TablespaceManager.getLocalFs()).getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -176,7 +176,7 @@ public class TestLineReader {
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
 
     Path tablePath = new Path(testDir, "testLineDelimitedReader");
-    FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
+    FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -279,7 +279,7 @@ public class TestLineReader {
 
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
     Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data");
-    FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
+    FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 79928ff..331d3e8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -94,7 +94,7 @@ public class TestMergeScanner {
     conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = TableSpaceManager.getLocalFs();
+    sm = TablespaceManager.getLocalFs();
   }
 
   @Test
@@ -114,7 +114,7 @@ public class TestMergeScanner {
     }
 
     Path table1Path = new Path(testDir, storeType + "_1.data");
-    Appender appender1 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path);
+    Appender appender1 = TablespaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path);
     appender1.enableStats();
     appender1.init();
     int tupleNum = 10000;
@@ -136,7 +136,7 @@ public class TestMergeScanner {
     }
 
     Path table2Path = new Path(testDir, storeType + "_2.data");
-    Appender appender2 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path);
+    Appender appender2 = TablespaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path);
     appender2.enableStats();
     appender2.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index ce2a926..dbfdac3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -155,7 +155,7 @@ public class TestStorages {
 
       TableMeta meta = CatalogUtil.newTableMeta(storeType);
       Path tablePath = new Path(testDir, "Splitable.data");
-      FileTablespace sm = TableSpaceManager.getLocalFs();
+      FileTablespace sm = TablespaceManager.getLocalFs();
       Appender appender = sm.getAppender(meta, schema, tablePath);
       appender.enableStats();
       appender.init();
@@ -210,7 +210,7 @@ public class TestStorages {
 
       TableMeta meta = CatalogUtil.newTableMeta(storeType);
       Path tablePath = new Path(testDir, "Splitable.data");
-      FileTablespace sm = TableSpaceManager.getLocalFs();
+      FileTablespace sm = TablespaceManager.getLocalFs();
       Appender appender = sm.getAppender(meta, schema, tablePath);
       appender.enableStats();
       appender.init();
@@ -271,7 +271,7 @@ public class TestStorages {
     }
 
     Path tablePath = new Path(testDir, "testProjection.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.init();
     int tupleNum = 10000;
@@ -347,7 +347,7 @@ public class TestStorages {
       meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
     }
 
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.init();
@@ -425,7 +425,7 @@ public class TestStorages {
     }
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -469,7 +469,7 @@ public class TestStorages {
 
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple retrieved;
@@ -513,7 +513,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -544,7 +544,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple retrieved;
@@ -582,7 +582,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -613,7 +613,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple retrieved;
@@ -651,7 +651,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -682,7 +682,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     assertTrue(scanner instanceof SequenceFileScanner);
@@ -724,7 +724,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -756,7 +756,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     assertTrue(scanner instanceof SequenceFileScanner);
@@ -786,7 +786,7 @@ public class TestStorages {
       TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 
       Path tablePath = new Path(testDir, "testTime.data");
-      FileTablespace sm = TableSpaceManager.getLocalFs();
+      FileTablespace sm = TablespaceManager.getLocalFs();
       Appender appender = sm.getAppender(meta, schema, tablePath);
       appender.init();
 
@@ -801,7 +801,7 @@ public class TestStorages {
 
       FileStatus status = fs.getFileStatus(tablePath);
       FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-      Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+      Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
       scanner.init();
 
       Tuple retrieved;
@@ -827,7 +827,7 @@ public class TestStorages {
 
     TableMeta meta = CatalogUtil.newTableMeta(storeType);
     Path tablePath = new Path(testDir, "Seekable.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -869,7 +869,7 @@ public class TestStorages {
     long readBytes = 0;
     long readRows = 0;
     for (long offset : offsets) {
-      scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema,
+      scanner = TablespaceManager.getLocalFs().getScanner(meta, schema,
 	        new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
       scanner.init();
 
@@ -917,7 +917,7 @@ public class TestStorages {
       conf.setInt(RawFile.WRITE_BUFFER_SIZE, record + headerSize);
     }
 
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Path tablePath = new Path(testDir, "testMaxValue.data");
     Appender appender = sm.getAppender(meta, schema, tablePath);
 
@@ -972,7 +972,7 @@ public class TestStorages {
     meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
 
     Path tablePath = new Path(testDir, "testLessThanSchemaSize.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, dataSchema, tablePath);
     appender.init();
 
@@ -998,7 +998,7 @@ public class TestStorages {
     inSchema.addColumn("col5", Type.INT8);
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, inSchema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, inSchema, fragment);
 
     Schema target = new Schema();
 
@@ -1036,7 +1036,7 @@ public class TestStorages {
     meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
 
     Path tablePath = new Path(testDir, "test_storetype_oversize.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, dataSchema, tablePath);
     appender.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 1a62f52..22fb607 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -89,7 +89,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindValue_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
@@ -178,7 +178,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType);
-    FileAppender appender = (FileAppender) ((FileTablespace)TableSpaceManager.getLocalFs())
+    FileAppender appender = (FileAppender) ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -257,7 +257,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i += 2) {
@@ -327,7 +327,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
@@ -418,7 +418,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType);
-    Appender appender = (((FileTablespace)TableSpaceManager.getLocalFs()))
+    Appender appender = (((FileTablespace) TablespaceManager.getLocalFs()))
         .getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
@@ -498,7 +498,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -582,7 +582,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testMinMax_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
@@ -687,7 +687,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -768,7 +768,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -860,7 +860,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.init();
 
     Tuple tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 5ad7a27..72810fd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -77,7 +77,7 @@ public class TestSingleCSVFileBSTIndex {
     Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv");
     fs.mkdirs(tablePath.getParent());
 
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i++) {
@@ -166,7 +166,7 @@ public class TestSingleCSVFileBSTIndex {
     Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV",
         "table1.csv");
     fs.mkdirs(tablePath.getParent());
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for(int i = 0 ; i < TUPLE_NUM; i ++ ) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
index 2fbf5d6..8095081 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
@@ -69,7 +69,7 @@ public class TestJsonSerDe {
     FileSystem fs = FileSystem.getLocal(conf);
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple tuple = scanner.next();