You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/06/25 02:55:48 UTC

[1/3] tajo git commit: TAJO-1616: Implement TablespaceManager to load Tablespaces. (missed commits)

Repository: tajo
Updated Branches:
  refs/heads/master d44072762 -> 90afaa468


http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
new file mode 100644
index 0000000..26af769
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TablespaceManager.java
@@ -0,0 +1,390 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import net.minidev.json.JSONObject;
+import net.minidev.json.parser.JSONParser;
+import net.minidev.json.parser.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.Pair;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.tajo.storage.StorageConstants.LOCAL_FS_URI;
+
+/**
+ * It handles available table spaces and cache TableSpace instances.
+ *
+ * Default tablespace must be a filesystem-based one.
+ * HDFS and S3 can be a default tablespace if a Tajo cluster is in fully distributed mode.
+ * Local file system can be a default tablespace if a Tajo cluster runs on a single machine.
+ */
+public class TablespaceManager implements StorageService {
+  private static final Log LOG = LogFactory.getLog(TablespaceManager.class);
+
+  public static final String DEFAULT_CONFIG_FILE = "storage-default.json";
+  public static final String SITE_CONFIG_FILE = "storage-site.json";
+
+  /** default tablespace name */
+  public static final String DEFAULT_TABLESPACE_NAME = "default";
+
+  private final static TajoConf systemConf = new TajoConf();
+  private final static JSONParser parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR);
+
+  // The relation ship among name, URI, Tablespaces must be kept 1:1:1.
+  protected static final Map<String, URI> SPACES_URIS_MAP = Maps.newHashMap();
+  protected static final TreeMap<URI, Tablespace> TABLE_SPACES = Maps.newTreeMap();
+
+  protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap();
+  protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap();
+
+  public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class};
+
+  static {
+    instance = new TablespaceManager();
+  }
+  /**
+   * Singleton instance
+   */
+  private static final TablespaceManager instance;
+
+  private TablespaceManager() {
+    initForDefaultConfig(); // loading storage-default.json
+    initSiteConfig();       // storage-site.json will override the configs of storage-default.json
+    addWarehouseAsSpace();  // adding a warehouse directory for a default tablespace
+    addLocalFsTablespace(); // adding a tablespace using local file system by default
+  }
+
+  private void addWarehouseAsSpace() {
+    Path warehouseDir = TajoConf.getWarehouseDir(systemConf);
+    registerTableSpace(DEFAULT_TABLESPACE_NAME, warehouseDir.toUri(), null, true, false);
+  }
+
+  private void addLocalFsTablespace() {
+    if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) {
+      String tmpName = UUID.randomUUID().toString();
+      registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false);
+    }
+  }
+
+  public static TablespaceManager getInstance() {
+    return instance;
+  }
+
+  private void initForDefaultConfig() {
+    JSONObject json = loadFromConfig(DEFAULT_CONFIG_FILE);
+    if (json == null) {
+      throw new IllegalStateException("There is no " + SITE_CONFIG_FILE);
+    }
+    applyConfig(json, false);
+  }
+
+  private void initSiteConfig() {
+    JSONObject json = loadFromConfig(SITE_CONFIG_FILE);
+
+    // if there is no storage-site.json file, nothing happen.
+    if (json != null) {
+      applyConfig(json, true);
+    }
+  }
+
+  private JSONObject loadFromConfig(String fileName) {
+    String json;
+    try {
+      json = FileUtil.readTextFileFromResource(fileName);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (json != null) {
+      return parseJson(json);
+    } else {
+      return null;
+    }
+  }
+
+  private static JSONObject parseJson(String json) {
+    try {
+      return (JSONObject) parser.parse(json);
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void applyConfig(JSONObject json, boolean override) {
+    loadStorages(json);
+    loadTableSpaces(json, override);
+  }
+
+  private void loadStorages(JSONObject json) {
+    JSONObject spaces = (JSONObject) json.get(KEY_STORAGES);
+
+    if (spaces != null) {
+      Pair<String, Class<? extends Tablespace>> pair = null;
+      for (Map.Entry<String, Object> entry : spaces.entrySet()) {
+
+        try {
+          pair = extractStorage(entry);
+        } catch (ClassNotFoundException e) {
+          LOG.warn(e);
+          continue;
+        }
+
+        TABLE_SPACE_HANDLERS.put(pair.getFirst(), pair.getSecond());
+      }
+    }
+  }
+
+  private Pair<String, Class<? extends Tablespace>> extractStorage(Map.Entry<String, Object> entry)
+      throws ClassNotFoundException {
+
+    String storageType = entry.getKey();
+    JSONObject storageDesc = (JSONObject) entry.getValue();
+    String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER);
+
+    return new Pair<String, Class<? extends Tablespace>>(
+        storageType,(Class<? extends Tablespace>) Class.forName(handlerClass));
+  }
+
+  private void loadTableSpaces(JSONObject json, boolean override) {
+    JSONObject spaces = (JSONObject) json.get(KEY_SPACES);
+
+    if (spaces != null) {
+      for (Map.Entry<String, Object> entry : spaces.entrySet()) {
+        AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override);
+      }
+    }
+  }
+
+  public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) {
+    boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default"));
+    URI spaceUri = URI.create(spaceDesc.getAsString("uri"));
+
+    if (defaultSpace) {
+      registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override);
+    }
+    registerTableSpace(spaceName, spaceUri, spaceDesc, true, override);
+  }
+
+  private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc,
+                                         boolean visible, boolean override) {
+    Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible);
+    tableSpace.setVisible(visible);
+
+    try {
+      tableSpace.init(systemConf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    putTablespace(tableSpace, override);
+
+    // If the arbitrary path is allowed, root uri is also added as a tablespace
+    if (tableSpace.getProperty().isArbitraryPathAllowed()) {
+      URI rootUri = tableSpace.getRootUri();
+      // if there already exists or the rootUri is 'file:/', it won't overwrite the tablespace.
+      if (!TABLE_SPACES.containsKey(rootUri) && !rootUri.toString().startsWith(LOCAL_FS_URI.toString())) {
+        String tmpName = UUID.randomUUID().toString();
+        registerTableSpace(tmpName, rootUri, spaceDesc, false, override);
+      }
+    }
+  }
+
+  private static void putTablespace(Tablespace space, boolean override) {
+    // It is a device to keep the relationship among name, URI, and tablespace 1:1:1.
+
+    boolean nameExist = SPACES_URIS_MAP.containsKey(space.getName());
+    boolean uriExist = TABLE_SPACES.containsKey(space.uri);
+
+    boolean mismatch = nameExist && !SPACES_URIS_MAP.get(space.getName()).equals(space.getUri());
+    mismatch = mismatch || uriExist && TABLE_SPACES.get(space.uri).equals(space);
+
+    if (!override && mismatch) {
+      throw new RuntimeException("Name or URI of Tablespace must be unique.");
+    }
+
+    SPACES_URIS_MAP.put(space.getName(), space.getUri());
+    // We must guarantee that the same uri results in the same tablespace instance.
+    TABLE_SPACES.put(space.getUri(), space);
+  }
+
+  /**
+   * Return length of the fragment.
+   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
+   *
+   * @param conf Tajo system property
+   * @param fragment Fragment
+   * @return
+   */
+  public static long guessFragmentVolume(TajoConf conf, Fragment fragment) {
+    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
+      return conf.getLongVar(TajoConf.ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
+    } else {
+      return fragment.getLength();
+    }
+  }
+
+  public static final String KEY_STORAGES = "storages"; // storages
+  public static final String KEY_STORAGE_HANDLER = "handler"; // storages/?/handler
+  public static final String KEY_STORAGE_DEFAULT_FORMAT = "default-format"; // storages/?/default-format
+
+  public static final String KEY_SPACES = "spaces";
+
+  private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) {
+    Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri);
+    Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme());
+
+    if (clazz == null) {
+      throw new RuntimeException("There is no tablespace for " + uri.toString());
+    }
+
+    try {
+      Constructor<? extends Tablespace> constructor =
+          (Constructor<? extends Tablespace>) CONSTRUCTORS.get(clazz);
+
+      if (constructor == null) {
+        constructor = clazz.getDeclaredConstructor(TABLESPACE_PARAM);
+        constructor.setAccessible(true);
+        CONSTRUCTORS.put(clazz, constructor);
+      }
+
+      return constructor.newInstance(new Object[]{spaceName, uri});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) {
+    Tablespace existing;
+    synchronized (SPACES_URIS_MAP) {
+      // Remove existing one
+      SPACES_URIS_MAP.remove(space.getName());
+      existing = TABLE_SPACES.remove(space.getUri());
+
+      // Add anotherone for test
+      registerTableSpace(space.name, space.uri, null, true, true);
+    }
+    // if there is an existing one, return it.
+    return Optional.fromNullable(existing);
+  }
+
+  public Iterable<String> getSupportSchemes() {
+    return TABLE_SPACE_HANDLERS.keySet();
+  }
+
+  /**
+   * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
+   *
+   * @param uri Table or Table Fragment URI.
+   * @param <T> Tablespace class type
+   * @return Tablespace. If uri is null, the default tablespace will be returned.
+   */
+  public static <T extends Tablespace> Optional<T> get(@Nullable String uri) {
+
+    if (uri == null || uri.isEmpty()) {
+      return (Optional<T>) Optional.of(getDefault());
+    }
+
+    Tablespace lastOne = null;
+
+    // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and
+    // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific.
+    for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) {
+      if (uri.startsWith(entry.getKey().toString())) {
+        lastOne = entry.getValue();
+      }
+    }
+    return (Optional<T>) Optional.fromNullable(lastOne);
+  }
+
+  /**
+   * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
+   *
+   * @param uri Table or Table Fragment URI.
+   * @param <T> Tablespace class type
+   * @return Tablespace. If uri is null, the default tablespace will be returned.
+   */
+  public static <T extends Tablespace> Optional<T> get(@Nullable URI uri) {
+    if (uri == null) {
+      return (Optional<T>) Optional.of(getDefault());
+    } else {
+      return (Optional<T>) get(uri.toString());
+    }
+  }
+
+  /**
+   * It returns the default tablespace. This method ensures that it always return the tablespace.
+   *
+   * @return
+   */
+  public static <T extends Tablespace> T getDefault() {
+    return (T) getByName(DEFAULT_TABLESPACE_NAME).get();
+  }
+
+  public static <T extends Tablespace> T getLocalFs() {
+    return (T) get(LOCAL_FS_URI).get();
+  }
+
+  public static Optional<? extends Tablespace> getByName(String name) {
+    URI uri = SPACES_URIS_MAP.get(name);
+    if (uri != null) {
+      return Optional.of(TABLE_SPACES.get(uri));
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  public static Optional<? extends Tablespace> getAnyByScheme(String scheme) {
+    for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.entrySet()) {
+      String uriScheme = entry.getKey().getScheme();
+      if (uriScheme != null && uriScheme.equalsIgnoreCase(scheme)) {
+        return Optional.of(entry.getValue());
+      }
+    }
+
+    return Optional.absent();
+  }
+
+  @Override
+  public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) {
+    Tablespace space = spaceName == null ? getDefault() : getByName(spaceName).get();
+    return space.getTableUri(databaseName, tableName);
+  }
+
+  public static Iterable<Tablespace> getAllTablespaces() {
+    return TABLE_SPACES.values();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 916aae7..7943134 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
@@ -47,7 +47,7 @@ public class HBasePutAppender extends AbstractHBaseAppender {
   public void init() throws IOException {
     super.init();
 
-    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(uri).get();
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.get(uri).get();
     HConnection hconn = space.getConnection();
     htable = hconn.getTable(columnMapping.getHbaseTableName());
     htable.setAutoFlushTo(false);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index 16f4c14..7369897 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -133,7 +133,7 @@ public class HBaseScanner implements Scanner {
     rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
     rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
 
-    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get();
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.get(fragment.getUri()).get();
     hbaseConf = space.getHbaseConf();
     initScanner();
   }
@@ -181,7 +181,7 @@ public class HBaseScanner implements Scanner {
     }
 
     if (htable == null) {
-      HConnection hconn = ((HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get()).getConnection();
+      HConnection hconn = ((HBaseTablespace) TablespaceManager.get(fragment.getUri()).get()).getConnection();
       htable = hconn.getTable(fragment.getHbaseTableName());
     }
     scanner = htable.getScanner(scan);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
index 5fac0cf..18bb7ed 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTablespace.java
@@ -51,10 +51,7 @@ import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.BytesUtils;
-import org.apache.tajo.util.Pair;
-import org.apache.tajo.util.TUtil;
+import org.apache.tajo.util.*;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -68,9 +65,9 @@ import java.util.*;
 public class HBaseTablespace extends Tablespace {
   private static final Log LOG = LogFactory.getLog(HBaseTablespace.class);
 
-  public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty(false, true, true, false);
-
-  public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true);
+  public static final StorageProperty HBASE_STORAGE_PROPERTIES = new StorageProperty("hbase", false, true, false);
+  public static final FormatProperty HFILE_FORMAT_PROPERTIES = new FormatProperty(true, false, true);
+  public static final FormatProperty PUT_MODE_PROPERTIES = new FormatProperty(true, true, false);
 
   private Configuration hbaseConf;
 
@@ -572,6 +569,15 @@ public class HBaseTablespace extends Tablespace {
   }
 
   @Override
+  public Appender getAppenderForInsertRow(OverridableConf queryContext,
+                                          TaskAttemptId taskAttemptId,
+                                          TableMeta meta,
+                                          Schema schema,
+                                          Path workDir) throws IOException {
+    return new HBasePutAppender(conf, uri, taskAttemptId, schema, meta, workDir);
+  }
+
+  @Override
   public Appender getAppender(OverridableConf queryContext,
                               TaskAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir)
       throws IOException {
@@ -1096,8 +1102,14 @@ public class HBaseTablespace extends Tablespace {
   }
 
   @Override
-  public FormatProperty getFormatProperty(String format) {
-    return HFILE_FORMAT_PROPERTIES;
+  public FormatProperty getFormatProperty(TableMeta meta) {
+    KeyValueSet tableProperty = meta.getOptions();
+    if (tableProperty.isTrue(HBaseStorageConstants.INSERT_PUT_MODE) ||
+        tableProperty.isTrue(StorageConstants.INSERT_DIRECTLY)) {
+      return PUT_MODE_PROPERTIES;
+    } else {
+      return HFILE_FORMAT_PROPERTIES;
+    }
   }
 
   public void prepareTable(LogicalNode node) throws IOException {
@@ -1134,6 +1146,24 @@ public class HBaseTablespace extends Tablespace {
   }
 
   @Override
+  public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
+    if (meta.getOptions().isTrue(HBaseStorageConstants.INSERT_PUT_MODE)) {
+      throw new IOException("Staging phase is not supported in this storage.");
+    } else {
+      return TablespaceManager.getDefault().getStagingUri(context, queryId, meta);
+    }
+  }
+
+  public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context,
+                                 TableMeta meta) throws IOException {
+    if (!meta.getOptions().isTrue(HBaseStorageConstants.INSERT_PUT_MODE)) {
+      return TablespaceManager.getDefault().prepareStagingSpace(conf, queryId, context, meta);
+    } else {
+      throw new IOException("Staging phase is not supported in this storage.");
+    }
+  }
+
+  @Override
   public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException  {
     if (tableDesc != null) {
       Schema tableSchema = tableDesc.getSchema();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
index f7cbb5a..f0c8f15 100644
--- a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
+++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseTableSpace.java
@@ -25,7 +25,7 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.Pair;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -45,7 +45,7 @@ public class TestHBaseTableSpace {
     String tableSpaceUri = "hbase:zk://host1:2171";
     HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri));
     hBaseTablespace.init(new TajoConf());
-    TableSpaceManager.addTableSpaceForTest(hBaseTablespace);
+    TablespaceManager.addTableSpaceForTest(hBaseTablespace);
   }
 
   @Test
@@ -58,8 +58,8 @@ public class TestHBaseTableSpace {
 
   @Test
   public void testTablespaceHandler() throws Exception {
-    assertTrue((TableSpaceManager.getByName("cluster1").get()) instanceof HBaseTablespace);
-    assertTrue((TableSpaceManager.get(URI.create("hbase:zk://host1:2171")).get())
+    assertTrue((TablespaceManager.getByName("cluster1").get()) instanceof HBaseTablespace);
+    assertTrue((TablespaceManager.get(URI.create("hbase:zk://host1:2171")).get())
         instanceof HBaseTablespace);
   }
 
@@ -73,7 +73,7 @@ public class TestHBaseTableSpace {
     EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
     scanNode.setQual(evalNodeA);
 
-    HBaseTablespace storageManager = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    HBaseTablespace storageManager = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
     List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn});
     assertNotNull(indexEvals);
     assertEquals(1, indexEvals.size());

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index 081fa3f..efe2bfd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -58,7 +58,7 @@ public abstract class FileAppender implements Appender {
         throw new IllegalArgumentException("Configuration must be an instance of TajoConf");
       }
 
-      Optional<FileTablespace> spaceResult = TableSpaceManager.get(workDir.toUri());
+      Optional<FileTablespace> spaceResult = TablespaceManager.get(workDir.toUri());
 
       if (!spaceResult.isPresent()) {
         throw new IllegalStateException("No TableSpace for " + workDir.toUri());

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
index 2ce1f09..3b63012 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
@@ -19,14 +19,17 @@
 package org.apache.tajo.storage;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.*;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -92,8 +95,12 @@ public class FileTablespace extends Tablespace {
         }
       };
 
+  private static final StorageProperty FileStorageProperties = new StorageProperty("TEXT", true, true, true);
+  private static final FormatProperty GeneralFileProperties = new FormatProperty(true, false, true);
+
   protected FileSystem fs;
-  protected Path basePath;
+  protected Path spacePath;
+  protected Path stagingRootPath;
   protected boolean blocksMetadataEnabled;
   private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
 
@@ -103,8 +110,9 @@ public class FileTablespace extends Tablespace {
 
   @Override
   protected void storageInit() throws IOException {
-    this.basePath = new Path(uri);
-    this.fs = basePath.getFileSystem(conf);
+    this.spacePath = new Path(uri);
+    this.fs = spacePath.getFileSystem(conf);
+    this.stagingRootPath = fs.makeQualified(new Path(conf.getVar(TajoConf.ConfVars.STAGING_ROOT_DIR)));
     this.conf.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, fs.getUri().toString());
 
     this.blocksMetadataEnabled =
@@ -167,7 +175,7 @@ public class FileTablespace extends Tablespace {
 
   @Override
   public URI getTableUri(String databaseName, String tableName) {
-    return StorageUtil.concatPath(basePath, databaseName, tableName).toUri();
+    return StorageUtil.concatPath(spacePath, databaseName, tableName).toUri();
   }
 
   private String partitionPath = "";
@@ -192,12 +200,12 @@ public class FileTablespace extends Tablespace {
   }
 
   public FileFragment[] split(String tableName) throws IOException {
-    Path tablePath = new Path(basePath, tableName);
+    Path tablePath = new Path(spacePath, tableName);
     return split(tableName, tablePath, fs.getDefaultBlockSize());
   }
 
   public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
-    Path tablePath = new Path(basePath, tableName);
+    Path tablePath = new Path(spacePath, tableName);
     return split(tableName, tablePath, fragmentSize);
   }
 
@@ -491,30 +499,6 @@ public class FileTablespace extends Tablespace {
   }
 
   /**
-   * Generate the map of host and make them into Volume Ids.
-   *
-   */
-  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
-    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
-    for (FileFragment frag : frags) {
-      String[] hosts = frag.getHosts();
-      int[] diskIds = frag.getDiskIds();
-      for (int i = 0; i < hosts.length; i++) {
-        Set<Integer> volumeList = volumeMap.get(hosts[i]);
-        if (volumeList == null) {
-          volumeList = new HashSet<Integer>();
-          volumeMap.put(hosts[i], volumeList);
-        }
-
-        if (diskIds.length > 0 && diskIds[i] > -1) {
-          volumeList.add(diskIds[i]);
-        }
-      }
-    }
-
-    return volumeMap;
-  }
-  /**
    * Generate the list of files and make them into FileSplits.
    *
    * @throws IOException
@@ -674,7 +658,7 @@ public class FileTablespace extends Tablespace {
       String simpleTableName = splitted[1];
 
       // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
-      Path tablePath = StorageUtil.concatPath(basePath, databaseName, simpleTableName);
+      Path tablePath = StorageUtil.concatPath(spacePath, databaseName, simpleTableName);
       tableDesc.setUri(tablePath.toUri());
     } else {
       Preconditions.checkState(tableDesc.getUri() != null, "ERROR: LOCATION must be given.");
@@ -851,22 +835,14 @@ public class FileTablespace extends Tablespace {
     }
   }
 
-  private static final StorageProperty FileStorageProperties = new StorageProperty(true, true, true, true);
-  private static final FormatProperty GeneralFileProperties = new FormatProperty(false);
-  private static final FormatProperty HFileProperties = new FormatProperty(true);
-
   @Override
   public StorageProperty getProperty() {
     return FileStorageProperties;
   }
 
   @Override
-  public FormatProperty getFormatProperty(String format) {
-    if (format.equalsIgnoreCase("hbase")) {
-      return HFileProperties;
-    } else {
-      return GeneralFileProperties;
-    }
+  public FormatProperty getFormatProperty(TableMeta meta) {
+    return GeneralFileProperties;
   }
 
   @Override
@@ -882,6 +858,84 @@ public class FileTablespace extends Tablespace {
   }
 
   @Override
+  public URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException {
+    String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, "");
+
+    Path stagingDir;
+    // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO
+    // So, this query results won't be materialized as a part of a table.
+    // The result will be temporarily written in the staging directory.
+    if (outputPath.isEmpty()) {
+      // for temporarily written in the storage directory
+      stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId));
+    } else {
+      Optional<Tablespace> spaceResult = TablespaceManager.get(outputPath);
+      if (!spaceResult.isPresent()) {
+        throw new IOException("No registered Tablespace for " + outputPath);
+      }
+
+      Tablespace space = spaceResult.get();
+      if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation
+        // If this space allows move operation, the staging directory will be underneath the final output table uri.
+        stagingDir = fs.makeQualified(StorageUtil.concatPath(outputPath, TMP_STAGING_DIR_PREFIX, queryId));
+      } else {
+        stagingDir = fs.makeQualified(new Path(stagingRootPath, queryId));
+      }
+    }
+
+    return stagingDir.toUri();
+  }
+
+  // query submission directory is private!
+  final public static FsPermission STAGING_DIR_PERMISSION = FsPermission.createImmutable((short) 0700); // rwx--------
+  public static final String TMP_STAGING_DIR_PREFIX = ".staging";
+
+  public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context, TableMeta meta)
+      throws IOException {
+
+    String realUser;
+    String currentUser;
+    UserGroupInformation ugi;
+    ugi = UserGroupInformation.getLoginUser();
+    realUser = ugi.getShortUserName();
+    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+
+
+    Path stagingDir = new Path(getStagingUri(context, queryId, meta));
+
+    ////////////////////////////////////////////
+    // Create Output Directory
+    ////////////////////////////////////////////
+
+    if (fs.exists(stagingDir)) {
+      throw new IOException("The staging directory '" + stagingDir + "' already exists");
+    }
+    fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    FileStatus fsStatus = fs.getFileStatus(stagingDir);
+    String owner = fsStatus.getOwner();
+
+    if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
+      throw new IOException("The ownership on the user's query " +
+          "directory " + stagingDir + " is not as expected. " +
+          "It is owned by " + owner + ". The directory must " +
+          "be owned by the submitter " + currentUser + " or " +
+          "by " + realUser);
+    }
+
+    if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+      LOG.info("Permissions on staging directory " + stagingDir + " are " +
+          "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+          "to correct value " + STAGING_DIR_PERMISSION);
+      fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    }
+
+    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+    fs.mkdirs(stagingResultDir);
+
+    return stagingDir.toUri();
+  }
+
+  @Override
   public void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException {
   }
 
@@ -1257,4 +1311,6 @@ public class FileTablespace extends Tablespace {
 
     return retValue;
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index bd5502d..1d32291 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -85,7 +85,7 @@ public class HashShuffleAppenderManager {
           fs.mkdirs(dataFile.getParent());
         }
 
-        FileTablespace space = (FileTablespace) TableSpaceManager.get(dataFile.toUri()).get();
+        FileTablespace space = (FileTablespace) TablespaceManager.get(dataFile.toUri()).get();
         FileAppender appender = (FileAppender) space.getAppender(meta, outSchema, dataFile);
         appender.enableStats();
         appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index ab63d55..f50a20d 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -122,7 +122,7 @@ public class TestCompressionStorages {
 
     String fileName = "Compression_" + codec.getSimpleName();
     Path tablePath = new Path(testDir, fileName);
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.enableStats();
 
     appender.init();
@@ -154,7 +154,7 @@ public class TestCompressionStorages {
     FileFragment[] tablets = new FileFragment[1];
     tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
 
-    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, tablets[0], schema);
 
     if (storeType.equalsIgnoreCase("CSV")) {
       if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
index 2d919cd..ca5885c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestDelimitedTextFile.java
@@ -103,7 +103,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "-1");
     FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple tuple;
@@ -125,7 +125,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
     FileFragment fragment =  getFileFragment("testErrorTolerance1.json");
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     assertNotNull(scanner.next());
@@ -147,7 +147,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "0");
     FileFragment fragment =  getFileFragment("testErrorTolerance2.json");
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     try {
@@ -166,7 +166,7 @@ public class TestDelimitedTextFile {
     TableMeta meta = CatalogUtil.newTableMeta("JSON");
     meta.putOption(StorageUtil.TEXT_ERROR_TOLERANCE_MAXNUM, "1");
     FileFragment fragment = getFileFragment("testErrorTolerance3.json");
-    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index 9237e07..1119968 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -57,7 +57,7 @@ public class TestFileSystems {
   public TestFileSystems(FileSystem fs) throws IOException {
     this.fs = fs;
     this.conf = new TajoConf(fs.getConf());
-    sm = TableSpaceManager.getLocalFs();
+    sm = TablespaceManager.getLocalFs();
     testDir = getTestDir(this.fs, TEST_PATH);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
index ec3e143..09b91ea 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestFileTablespace.java
@@ -81,7 +81,7 @@ public class TestFileTablespace {
 
     Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
     localFs.mkdirs(path.getParent());
-    FileTablespace fileStorageManager = (FileTablespace) TableSpaceManager.getLocalFs();
+    FileTablespace fileStorageManager = (FileTablespace) TablespaceManager.getLocalFs();
     assertEquals(localFs.getUri(), fileStorageManager.getFileSystem().getUri());
 
 		Appender appender = fileStorageManager.getAppender(meta, schema, path);
@@ -224,24 +224,24 @@ public class TestFileTablespace {
     Optional<Tablespace> existingTs = Optional.absent();
     try {
       /* Local FileSystem */
-      FileTablespace space = TableSpaceManager.getLocalFs();
+      FileTablespace space = TablespaceManager.getLocalFs();
       assertEquals(localFs.getUri(), space.getFileSystem().getUri());
 
       FileTablespace distTablespace = new FileTablespace("testGetFileTablespace", uri);
       distTablespace.init(conf);
-      existingTs = TableSpaceManager.addTableSpaceForTest(distTablespace);
+      existingTs = TablespaceManager.addTableSpaceForTest(distTablespace);
 
       /* Distributed FileSystem */
-      space = (FileTablespace) TableSpaceManager.get(uri).get();
+      space = (FileTablespace) TablespaceManager.get(uri).get();
       assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
 
-      space = (FileTablespace) TableSpaceManager.getByName("testGetFileTablespace").get();
+      space = (FileTablespace) TablespaceManager.getByName("testGetFileTablespace").get();
       assertEquals(cluster.getFileSystem().getUri(), space.getFileSystem().getUri());
 
     } finally {
 
       if (existingTs.isPresent()) {
-        TableSpaceManager.addTableSpaceForTest(existingTs.get());
+        TablespaceManager.addTableSpaceForTest(existingTs.get());
       }
 
       cluster.shutdown(true);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
index c13ce16..7410778 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestLineReader.java
@@ -65,7 +65,7 @@ public class TestLineReader {
 
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
     Path tablePath = new Path(testDir, "line.data");
-    FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
+    FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -118,7 +118,7 @@ public class TestLineReader {
     meta.putOption("compression.codec", DeflateCodec.class.getCanonicalName());
 
     Path tablePath = new Path(testDir, "testLineDelimitedReaderWithCompression." + DeflateCodec.class.getSimpleName());
-    FileAppender appender = (FileAppender) (TableSpaceManager.getLocalFs()).getAppender(
+    FileAppender appender = (FileAppender) (TablespaceManager.getLocalFs()).getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -176,7 +176,7 @@ public class TestLineReader {
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
 
     Path tablePath = new Path(testDir, "testLineDelimitedReader");
-    FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
+    FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -279,7 +279,7 @@ public class TestLineReader {
 
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
     Path tablePath = new Path(testDir, "testSeekableByteBufLineReader.data");
-    FileAppender appender = (FileAppender) TableSpaceManager.getLocalFs().getAppender(
+    FileAppender appender = (FileAppender) TablespaceManager.getLocalFs().getAppender(
         null, null, meta, schema, tablePath);
     appender.enableStats();
     appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 79928ff..331d3e8 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -94,7 +94,7 @@ public class TestMergeScanner {
     conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "parquet", "avro");
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = TableSpaceManager.getLocalFs();
+    sm = TablespaceManager.getLocalFs();
   }
 
   @Test
@@ -114,7 +114,7 @@ public class TestMergeScanner {
     }
 
     Path table1Path = new Path(testDir, storeType + "_1.data");
-    Appender appender1 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path);
+    Appender appender1 = TablespaceManager.getLocalFs().getAppender(null, null, meta, schema, table1Path);
     appender1.enableStats();
     appender1.init();
     int tupleNum = 10000;
@@ -136,7 +136,7 @@ public class TestMergeScanner {
     }
 
     Path table2Path = new Path(testDir, storeType + "_2.data");
-    Appender appender2 = TableSpaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path);
+    Appender appender2 = TablespaceManager.getLocalFs().getAppender(null, null, meta, schema, table2Path);
     appender2.enableStats();
     appender2.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index ce2a926..dbfdac3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -155,7 +155,7 @@ public class TestStorages {
 
       TableMeta meta = CatalogUtil.newTableMeta(storeType);
       Path tablePath = new Path(testDir, "Splitable.data");
-      FileTablespace sm = TableSpaceManager.getLocalFs();
+      FileTablespace sm = TablespaceManager.getLocalFs();
       Appender appender = sm.getAppender(meta, schema, tablePath);
       appender.enableStats();
       appender.init();
@@ -210,7 +210,7 @@ public class TestStorages {
 
       TableMeta meta = CatalogUtil.newTableMeta(storeType);
       Path tablePath = new Path(testDir, "Splitable.data");
-      FileTablespace sm = TableSpaceManager.getLocalFs();
+      FileTablespace sm = TablespaceManager.getLocalFs();
       Appender appender = sm.getAppender(meta, schema, tablePath);
       appender.enableStats();
       appender.init();
@@ -271,7 +271,7 @@ public class TestStorages {
     }
 
     Path tablePath = new Path(testDir, "testProjection.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.init();
     int tupleNum = 10000;
@@ -347,7 +347,7 @@ public class TestStorages {
       meta.putOption(StorageConstants.AVRO_SCHEMA_URL, path);
     }
 
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Path tablePath = new Path(testDir, "testVariousTypes.data");
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.init();
@@ -425,7 +425,7 @@ public class TestStorages {
     }
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -469,7 +469,7 @@ public class TestStorages {
 
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple retrieved;
@@ -513,7 +513,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -544,7 +544,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple retrieved;
@@ -582,7 +582,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -613,7 +613,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple retrieved;
@@ -651,7 +651,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -682,7 +682,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     assertTrue(scanner instanceof SequenceFileScanner);
@@ -724,7 +724,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -756,7 +756,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     assertTrue(scanner instanceof SequenceFileScanner);
@@ -786,7 +786,7 @@ public class TestStorages {
       TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 
       Path tablePath = new Path(testDir, "testTime.data");
-      FileTablespace sm = TableSpaceManager.getLocalFs();
+      FileTablespace sm = TablespaceManager.getLocalFs();
       Appender appender = sm.getAppender(meta, schema, tablePath);
       appender.init();
 
@@ -801,7 +801,7 @@ public class TestStorages {
 
       FileStatus status = fs.getFileStatus(tablePath);
       FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-      Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+      Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
       scanner.init();
 
       Tuple retrieved;
@@ -827,7 +827,7 @@ public class TestStorages {
 
     TableMeta meta = CatalogUtil.newTableMeta(storeType);
     Path tablePath = new Path(testDir, "Seekable.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     FileAppender appender = (FileAppender) sm.getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
@@ -869,7 +869,7 @@ public class TestStorages {
     long readBytes = 0;
     long readRows = 0;
     for (long offset : offsets) {
-      scanner = TableSpaceManager.getLocalFs().getScanner(meta, schema,
+      scanner = TablespaceManager.getLocalFs().getScanner(meta, schema,
 	        new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
       scanner.init();
 
@@ -917,7 +917,7 @@ public class TestStorages {
       conf.setInt(RawFile.WRITE_BUFFER_SIZE, record + headerSize);
     }
 
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Path tablePath = new Path(testDir, "testMaxValue.data");
     Appender appender = sm.getAppender(meta, schema, tablePath);
 
@@ -972,7 +972,7 @@ public class TestStorages {
     meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
 
     Path tablePath = new Path(testDir, "testLessThanSchemaSize.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, dataSchema, tablePath);
     appender.init();
 
@@ -998,7 +998,7 @@ public class TestStorages {
     inSchema.addColumn("col5", Type.INT8);
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = TableSpaceManager.getLocalFs().getScanner(meta, inSchema, fragment);
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, inSchema, fragment);
 
     Schema target = new Schema();
 
@@ -1036,7 +1036,7 @@ public class TestStorages {
     meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
 
     Path tablePath = new Path(testDir, "test_storetype_oversize.data");
-    FileTablespace sm = TableSpaceManager.getLocalFs();
+    FileTablespace sm = TablespaceManager.getLocalFs();
     Appender appender = sm.getAppender(meta, dataSchema, tablePath);
     appender.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 1a62f52..22fb607 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -89,7 +89,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindValue_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
@@ -178,7 +178,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType);
-    FileAppender appender = (FileAppender) ((FileTablespace)TableSpaceManager.getLocalFs())
+    FileAppender appender = (FileAppender) ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -257,7 +257,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i += 2) {
@@ -327,7 +327,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
@@ -418,7 +418,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType);
-    Appender appender = (((FileTablespace)TableSpaceManager.getLocalFs()))
+    Appender appender = (((FileTablespace) TablespaceManager.getLocalFs()))
         .getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
@@ -498,7 +498,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -582,7 +582,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testMinMax_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
@@ -687,7 +687,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -768,7 +768,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(meta, schema, tablePath);
     appender.init();
 
@@ -860,7 +860,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType);
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.init();
 
     Tuple tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 5ad7a27..72810fd 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -77,7 +77,7 @@ public class TestSingleCSVFileBSTIndex {
     Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv");
     fs.mkdirs(tablePath.getParent());
 
-    Appender appender = ((FileTablespace)TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i++) {
@@ -166,7 +166,7 @@ public class TestSingleCSVFileBSTIndex {
     Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV",
         "table1.csv");
     fs.mkdirs(tablePath.getParent());
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for(int i = 0 ; i < TUPLE_NUM; i ++ ) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
index 2fbf5d6..8095081 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/json/TestJsonSerDe.java
@@ -69,7 +69,7 @@ public class TestJsonSerDe {
     FileSystem fs = FileSystem.getLocal(conf);
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  TableSpaceManager.getLocalFs().getScanner(meta, schema, fragment);
+    Scanner scanner =  TablespaceManager.getLocalFs().getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple tuple = scanner.next();


[3/3] tajo git commit: TAJO-1616: Implement TablespaceManager to load Tablespaces. (missed commits)

Posted by hy...@apache.org.
TAJO-1616: Implement TablespaceManager to load Tablespaces. (missed commits)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/90afaa46
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/90afaa46
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/90afaa46

Branch: refs/heads/master
Commit: 90afaa468080d4f743ed2eee8326a38995900807
Parents: d440727
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Jun 24 17:55:10 2015 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Jun 24 17:55:10 2015 -0700

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 .../apache/tajo/storage/StorageConstants.java   |   3 +
 .../java/org/apache/tajo/util/KeyValueSet.java  |   8 +-
 .../engine/planner/PhysicalPlannerImpl.java     |   6 +-
 .../planner/physical/ColPartitionStoreExec.java |   2 +-
 .../engine/planner/physical/InsertRowsExec.java | 107 +++++
 .../physical/RangeShuffleFileWriteExec.java     |   2 +-
 .../engine/planner/physical/SeqScanExec.java    |   2 +-
 .../engine/planner/physical/StoreTableExec.java |  10 +-
 .../apache/tajo/engine/query/QueryContext.java  |   4 +-
 .../org/apache/tajo/master/GlobalEngine.java    |  10 +-
 .../apache/tajo/master/exec/DDLExecutor.java    |  10 +-
 .../exec/NonForwardQueryResultFileScanner.java  |   2 +-
 .../apache/tajo/master/exec/QueryExecutor.java  | 179 +++++----
 .../master/exec/prehook/CreateTableHook.java    |   2 -
 .../java/org/apache/tajo/querymaster/Query.java |   6 +-
 .../tajo/querymaster/QueryMasterTask.java       | 118 +-----
 .../apache/tajo/querymaster/Repartitioner.java  |  10 +-
 .../java/org/apache/tajo/querymaster/Stage.java |   4 +-
 .../org/apache/tajo/worker/LegacyTaskImpl.java  |   2 +-
 .../java/org/apache/tajo/worker/TaskImpl.java   |   2 +-
 .../src/main/resources/webapps/admin/index.jsp  |   4 +-
 .../org/apache/tajo/BackendTestingUtil.java     |   2 +-
 .../java/org/apache/tajo/QueryTestCaseBase.java |  35 ++
 .../org/apache/tajo/TajoTestingCluster.java     |  18 +-
 .../org/apache/tajo/cli/tsql/TestTajoCli.java   |   4 +-
 .../apache/tajo/engine/eval/ExprTestBase.java   |   5 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |   5 +-
 .../engine/planner/TestLogicalOptimizer.java    |   4 +-
 .../tajo/engine/planner/TestLogicalPlan.java    |   4 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |   4 +-
 .../tajo/engine/planner/TestPlannerUtil.java    |   4 +-
 .../planner/physical/TestBNLJoinExec.java       |   6 +-
 .../planner/physical/TestBSTIndexExec.java      |   4 +-
 .../planner/physical/TestExternalSortExec.java  |   4 +-
 .../physical/TestFullOuterHashJoinExec.java     |  10 +-
 .../physical/TestFullOuterMergeJoinExec.java    |  12 +-
 .../planner/physical/TestHashAntiJoinExec.java  |   6 +-
 .../planner/physical/TestHashJoinExec.java      |   6 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   6 +-
 .../physical/TestLeftOuterHashJoinExec.java     |  10 +-
 .../planner/physical/TestMergeJoinExec.java     |   6 +-
 .../engine/planner/physical/TestNLJoinExec.java |   6 +-
 .../planner/physical/TestPhysicalPlanner.java   |  12 +-
 .../physical/TestProgressExternalSortExec.java  |   4 +-
 .../physical/TestRightOuterHashJoinExec.java    |   8 +-
 .../physical/TestRightOuterMergeJoinExec.java   |  12 +-
 .../engine/planner/physical/TestSortExec.java   |   6 +-
 .../tajo/engine/query/TestHBaseTable.java       |  62 ++-
 .../apache/tajo/engine/query/TestJoinQuery.java |   2 +-
 .../apache/tajo/ha/TestHAServiceHDFSImpl.java   |  88 +++--
 .../org/apache/tajo/jdbc/TestResultSet.java     |   2 +-
 .../tajo/master/TestExecutionBlockCursor.java   |   4 +-
 .../TestNonForwardQueryResultSystemScanner.java | 258 +-----------
 .../apache/tajo/querymaster/TestKillQuery.java  |   6 +-
 .../org/apache/tajo/storage/TestRowFile.java    |   2 +-
 .../TestHBaseTable/testInsertValues1.result     |   4 +
 .../testGetClusterDetails.result                |   4 +
 .../testGetNextRowsForAggregateFunction.result  |   3 +
 .../testGetNextRowsForTable.result              |   5 +
 .../java/org/apache/tajo/plan/LogicalPlan.java  |   8 +
 .../org/apache/tajo/plan/util/PlannerUtil.java  |  16 +
 .../org/apache/tajo/storage/FormatProperty.java |  42 +-
 .../org/apache/tajo/storage/MergeScanner.java   |   4 +-
 .../apache/tajo/storage/OldStorageManager.java  |   3 +-
 .../apache/tajo/storage/StorageProperty.java    |  42 +-
 .../apache/tajo/storage/TableSpaceManager.java  | 390 -------------------
 .../org/apache/tajo/storage/Tablespace.java     |  19 +-
 .../apache/tajo/storage/TablespaceManager.java  | 390 +++++++++++++++++++
 .../tajo/storage/hbase/HBasePutAppender.java    |   4 +-
 .../apache/tajo/storage/hbase/HBaseScanner.java |   4 +-
 .../tajo/storage/hbase/HBaseTablespace.java     |  48 ++-
 .../tajo/storage/hbase/TestHBaseTableSpace.java |  10 +-
 .../org/apache/tajo/storage/FileAppender.java   |   2 +-
 .../org/apache/tajo/storage/FileTablespace.java | 138 +++++--
 .../storage/HashShuffleAppenderManager.java     |   2 +-
 .../tajo/storage/TestCompressionStorages.java   |   4 +-
 .../tajo/storage/TestDelimitedTextFile.java     |   8 +-
 .../apache/tajo/storage/TestFileSystems.java    |   2 +-
 .../apache/tajo/storage/TestFileTablespace.java |  12 +-
 .../org/apache/tajo/storage/TestLineReader.java |   8 +-
 .../apache/tajo/storage/TestMergeScanner.java   |   6 +-
 .../org/apache/tajo/storage/TestStorages.java   |  44 +--
 .../apache/tajo/storage/index/TestBSTIndex.java |  20 +-
 .../index/TestSingleCSVFileBSTIndex.java        |   4 +-
 .../apache/tajo/storage/json/TestJsonSerDe.java |   2 +-
 86 files changed, 1236 insertions(+), 1136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 8fc9b94..61d56fb 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,7 +19,7 @@ git:
   depth: 150
 
 jdk:
-  - openjdk7
+  - oraclejdk7
 
 env: PATH=$PATH:$HOME/local/bin
 
@@ -33,7 +33,7 @@ notifications:
   - issues@tajo.apache.org
   irc: "chat.freenode.net#tajo"
 
-
+before_install: ulimit -t 514029
 install: ./dev-support/travis-install-dependencies.sh
 
 script: 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index a9923a5..16cf51d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -30,6 +30,9 @@ public class StorageConstants {
 
   // Common table properties -------------------------------------------------
 
+  // Insert
+  public static final String INSERT_DIRECTLY = "insert.direct";
+
   // time zone
   public static final String TIMEZONE = "timezone";
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
index 0e27769..404606d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
@@ -115,6 +115,10 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
     return get(key, null);
   }
 
+  public boolean isTrue(String key) {
+    return getBool(key, false);
+  }
+
   public void setBool(String key, boolean val) {
     set(key, val ? TRUE_STR : FALSE_STR);
   }
@@ -137,9 +141,9 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
   public boolean getBool(ConfigKey key) {
     String keyName = key.keyname();
     if (key instanceof SessionVars) {
-      return getBool(keyName, ((SessionVars)key).getConfVars().defaultBoolVal);
+      return getBool(keyName, ((SessionVars) key).getConfVars().defaultBoolVal);
     } else if (key instanceof TajoConf.ConfVars) {
-      return getBool(keyName, ((TajoConf.ConfVars)key).defaultBoolVal);
+      return getBool(keyName, ((TajoConf.ConfVars) key).defaultBoolVal);
     }
     return getBool(keyName);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index f0b2f5e..c6b9b41 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -252,7 +252,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
       FragmentProto[] fragmentProtos = ctx.getTables(tableId);
       List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
       for (Fragment frag : fragments) {
-        size += TableSpaceManager.guessFragmentVolume(ctx.getConf(), frag);
+        size += TablespaceManager.guessFragmentVolume(ctx.getConf(), frag);
       }
     }
     return size;
@@ -926,7 +926,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
           PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode;
           List<Fragment> fileFragments = TUtil.newList();
 
-          FileTablespace space = (FileTablespace) TableSpaceManager.get(scanNode.getTableDesc().getUri()).get();
+          FileTablespace space = (FileTablespace) TablespaceManager.get(scanNode.getTableDesc().getUri()).get();
           for (Path path : partitionedTableScanNode.getInputPaths()) {
             fileFragments.addAll(TUtil.newList(space.split(scanNode.getCanonicalName(), path)));
           }
@@ -1190,7 +1190,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         FragmentConvertor.convert(ctx.getConf(), fragmentProtos);
 
     String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys());
-    FileTablespace sm = (FileTablespace) TableSpaceManager.get(fragments.get(0).getPath().toUri()).get();
+    FileTablespace sm = (FileTablespace) TablespaceManager.get(fragments.get(0).getPath().toUri()).get();
     String dbName = CatalogUtil.extractQualifier(annotation.getTableName());
     String simpleName = CatalogUtil.extractSimpleName(annotation.getTableName());
     Path indexPath = new Path(new Path(sm.getTableUri(dbName, simpleName)), "index");

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 969998c..76abc6d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -165,7 +165,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
       actualFilePath = new Path(lastFileName + "_" + suffixId);
     }
 
-    appender = ((FileTablespace) TableSpaceManager.get(lastFileName.toUri()).get())
+    appender = ((FileTablespace) TablespaceManager.get(lastFileName.toUri()).get())
         .getAppender(meta, outSchema, actualFilePath);
 
     appender.enableStats();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java
new file mode 100644
index 0000000..f3a24a7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/InsertRowsExec.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.plan.logical.PersistentStoreNode;
+import org.apache.tajo.plan.logical.StoreTableNode;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * This is a physical executor to store rows immediately.
+ */
+public class InsertRowsExec extends UnaryPhysicalExec {
+  private static final Log LOG = LogFactory.getLog(InsertRowsExec.class);
+
+  private PersistentStoreNode plan;
+  private TableMeta meta;
+  private Appender appender;
+  private Tuple tuple;
+
+  // for file punctuation
+  private TableStats sumStats; // for aggregating all stats of written files
+
+  public InsertRowsExec(TaskAttemptContext context, PersistentStoreNode plan, PhysicalExec child) throws IOException {
+    super(context, plan.getInSchema(), plan.getOutSchema(), child);
+    this.plan = plan;
+  }
+
+  public void init() throws IOException {
+    super.init();
+
+    if (plan.hasOptions()) {
+      meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
+    } else {
+      meta = CatalogUtil.newTableMeta(plan.getStorageType());
+    }
+
+    PhysicalPlanUtil.setNullCharIfNecessary(context.getQueryContext(), plan, meta);
+    sumStats = new TableStats();
+
+    StoreTableNode storeTableNode = (StoreTableNode) plan;
+    appender = TablespaceManager.get(storeTableNode.getUri()).get().getAppenderForInsertRow(
+        context.getQueryContext(),
+        context.getTaskId(), meta, storeTableNode.getTableSchema(), context.getOutputPath());
+    appender.enableStats();
+    appender.init();
+  }
+
+  /* (non-Javadoc)
+   * @see PhysicalExec#next()
+   */
+  @Override
+  public Tuple next() throws IOException {
+    while((tuple = child.next()) != null) {
+      appender.addTuple(tuple);
+    }
+        
+    return null;
+  }
+
+  @Override
+  public void rescan() throws IOException {
+    // nothing to do
+  }
+
+  public void close() throws IOException {
+    super.close();
+
+    if(appender != null){
+      appender.flush();
+      appender.close();
+
+      // Collect statistics data
+      StatisticsUtil.aggregateTableStat(sumStats, appender.getStats());
+      context.setResultStats(sumStats);
+    }
+
+    appender = null;
+    plan = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index fb29e4f..bbb21fe 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -77,7 +77,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
         context.getDataChannel().getStoreType() : "RAW");
     FileSystem fs = new RawLocalFileSystem();
     fs.mkdirs(storeTablePath);
-    this.appender = (FileAppender) ((FileTablespace) TableSpaceManager.getDefault())
+    this.appender = (FileAppender) ((FileTablespace) TablespaceManager.getDefault())
         .getAppender(meta, outSchema, new Path(storeTablePath, "output"));
     this.appender.enableStats();
     this.appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index d2ae3bd..599f160 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -202,7 +202,7 @@ public class SeqScanExec extends ScanExec {
             FragmentConvertor.convert(context.getConf(), fragments), projected
         );
       } else {
-        Tablespace tablespace = TableSpaceManager.get(plan.getTableDesc().getUri()).get();
+        Tablespace tablespace = TablespaceManager.get(plan.getTableDesc().getUri()).get();
         this.scanner = tablespace.getScanner(meta, plan.getPhysicalSchema(), fragments[0], projected);
       }
       scanner.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index dd8768e..6031fdb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -31,12 +31,14 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.plan.logical.InsertNode;
 import org.apache.tajo.plan.logical.PersistentStoreNode;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.Appender;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.TablespaceManager;
+import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.net.URI;
 
 /**
  * This is a physical executor to store a table part into a specified storage.
@@ -92,7 +94,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
         lastFileName = new Path(lastFileName + "_" + suffixId);
       }
 
-      Optional<FileTablespace> spaceRes = TableSpaceManager.get(lastFileName.toUri());
+      Optional<FileTablespace> spaceRes = TablespaceManager.get(lastFileName.toUri());
       if (!spaceRes.isPresent())  {
         throw new IllegalStateException("No Tablespace for " + lastFileName.toUri());
       }
@@ -106,7 +108,7 @@ public class StoreTableExec extends UnaryPhysicalExec {
       }
     } else {
       Path stagingDir = context.getQueryContext().getStagingDir();
-      appender = TableSpaceManager.get(stagingDir.toUri()).get().getAppender(
+      appender = TablespaceManager.get(stagingDir.toUri()).get().getAppender(
           context.getQueryContext(),
           context.getTaskId(),
           meta,

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index 7696c6c..da2f2ad 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -99,8 +99,8 @@ public class QueryContext extends OverridableConf {
     return strVal != null && !strVal.isEmpty() ? URI.create(strVal) : null;
   }
 
-  public void setStagingDir(Path path) {
-    put(QueryVars.STAGING_DIR, path.toUri().toString());
+  public void setStagingDir(URI uri) {
+    put(QueryVars.STAGING_DIR, uri.toString());
   }
 
   public Path getStagingDir() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index e833884..37b497c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -53,7 +53,7 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.VerificationState;
 import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 
 import java.io.IOException;
@@ -91,7 +91,7 @@ public class GlobalEngine extends AbstractService {
     try  {
       analyzer = new SQLAnalyzer();
       preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
-      planner = new LogicalPlanner(context.getCatalog(), TableSpaceManager.getInstance());
+      planner = new LogicalPlanner(context.getCatalog(), TablespaceManager.getInstance());
       optimizer = new LogicalOptimizer(context.getConf());
       annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
     } catch (Throwable t) {
@@ -141,8 +141,8 @@ public class GlobalEngine extends AbstractService {
     QueryContext newQueryContext =  new QueryContext(context.getConf(), session);
 
     // Set default space uri and its root uri
-    newQueryContext.setDefaultSpaceUri(TableSpaceManager.getDefault().getUri());
-    newQueryContext.setDefaultSpaceRootUri(TableSpaceManager.getDefault().getRootUri());
+    newQueryContext.setDefaultSpaceUri(TablespaceManager.getDefault().getUri());
+    newQueryContext.setDefaultSpaceRootUri(TablespaceManager.getDefault().getRootUri());
 
     String tajoTest = System.getProperty(CommonTestingUtil.TAJO_TEST_KEY);
     if (tajoTest != null && tajoTest.equalsIgnoreCase(CommonTestingUtil.TAJO_TEST_TRUE)) {
@@ -303,7 +303,7 @@ public class GlobalEngine extends AbstractService {
           InsertNode iNode = rootNode.getChild();
           Schema outSchema = iNode.getChild().getOutSchema();
 
-          TableSpaceManager.get(tableDesc.getUri()).get().verifySchemaToWrite(tableDesc, outSchema);
+          TablespaceManager.get(tableDesc.getUri()).get().verifySchemaToWrite(tableDesc, outSchema);
 
         } catch (Throwable t) {
           state.addVerification(t.getMessage());

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index 5e0e639..7104412 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -36,7 +36,7 @@ import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.StorageUtil;
 
@@ -251,11 +251,11 @@ public class DDLExecutor {
 
     Tablespace tableSpace;
     if (tableSpaceName != null) {
-      tableSpace = TableSpaceManager.getByName(tableSpaceName).get();
+      tableSpace = TablespaceManager.getByName(tableSpaceName).get();
     } else if (uri != null) {
-      tableSpace = TableSpaceManager.get(uri).get();
+      tableSpace = TablespaceManager.get(uri).get();
     } else {
-      tableSpace = TableSpaceManager.getDefault();
+      tableSpace = TablespaceManager.getDefault();
     }
 
     TableDesc desc;
@@ -313,7 +313,7 @@ public class DDLExecutor {
 
     if (purge) {
       try {
-        TableSpaceManager.get(tableDesc.getUri()).get().purgeTable(tableDesc);
+        TablespaceManager.get(tableDesc.getUri()).get().purgeTable(tableDesc);
       } catch (IOException e) {
         throw new InternalError(e.getMessage());
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
index ae57453..ec8760f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java
@@ -101,7 +101,7 @@ public class NonForwardQueryResultFileScanner implements NonForwardQueryResultSc
   }
 
   private void initSeqScanExec() throws IOException {
-    Tablespace tablespace = TableSpaceManager.get(tableDesc.getUri()).get();
+    Tablespace tablespace = TablespaceManager.get(tableDesc.getUri()).get();
     List<Fragment> fragments = null;
     setPartition(tablespace);
     fragments = tablespace.getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
index 480f45c..5d42157 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java
@@ -31,6 +31,7 @@ import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
@@ -39,7 +40,7 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.physical.EvalExprExec;
-import org.apache.tajo.engine.planner.physical.StoreTableExec;
+import org.apache.tajo.engine.planner.physical.InsertRowsExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
@@ -60,14 +61,13 @@ import org.apache.tajo.plan.function.python.TajoScriptEngine;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.verifier.VerifyException;
-import org.apache.tajo.querymaster.Query;
-import org.apache.tajo.querymaster.QueryMasterTask;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.ProtoUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -288,7 +288,7 @@ public class QueryExecutor {
       boolean isInsert = rootNode.getChild() != null && rootNode.getChild().getType() == NodeType.INSERT;
       if (isInsert) {
         InsertNode insertNode = rootNode.getChild();
-        insertNonFromQuery(queryContext, insertNode, responseBuilder);
+        insertRowValues(queryContext, insertNode, responseBuilder);
       } else {
         Schema schema = PlannerUtil.targetToSchema(targets);
         RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(schema);
@@ -330,89 +330,123 @@ public class QueryExecutor {
     }
   }
 
-  private void insertNonFromQuery(QueryContext queryContext,
-                                  InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) {
+  /**
+   * Insert rows through staging phase
+   */
+  private void insertRowsThroughStaging(TaskAttemptContext taskAttemptContext,
+                                        InsertNode insertNode,
+                                        Path finalOutputPath,
+                                        Path stagingDir,
+                                        Path stagingResultDir)
+      throws IOException {
+
+    EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+    InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec);
+
+    try {
+      exec.init();
+      exec.next();
+    } finally {
+      exec.close();
+    }
+
+    FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
+
+    if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
+      // it moves the original table into the temporary location.
+      // Then it moves the new result table into the original table location.
+      // Upon failed, it recovers the original table if possible.
+      boolean movedToOldTable = false;
+      boolean committed = false;
+      Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
+      try {
+        if (fs.exists(finalOutputPath)) {
+          fs.rename(finalOutputPath, oldTableDir);
+          movedToOldTable = fs.exists(oldTableDir);
+        } else { // if the parent does not exist, make its parent directory.
+          fs.mkdirs(finalOutputPath.getParent());
+        }
+        fs.rename(stagingResultDir, finalOutputPath);
+        committed = fs.exists(finalOutputPath);
+      } catch (IOException ioe) {
+        // recover the old table
+        if (movedToOldTable && !committed) {
+          fs.rename(oldTableDir, finalOutputPath);
+        }
+      }
+    } else {
+      FileStatus[] files = fs.listStatus(stagingResultDir);
+      for (FileStatus eachFile : files) {
+        Path targetFilePath = new Path(finalOutputPath, eachFile.getPath().getName());
+        if (fs.exists(targetFilePath)) {
+          targetFilePath = new Path(finalOutputPath, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
+        }
+        fs.rename(eachFile.getPath(), targetFilePath);
+      }
+    }
+  }
+
+  /**
+   * Insert row values
+   */
+  private void insertRowValues(QueryContext queryContext,
+                               InsertNode insertNode, SubmitQueryResponse.Builder responseBuilder) {
     try {
       String nodeUniqName = insertNode.getTableName() == null ? new Path(insertNode.getUri()).getName() :
           insertNode.getTableName();
       String queryId = nodeUniqName + "_" + System.currentTimeMillis();
 
-      FileSystem fs = TajoConf.getWarehouseDir(context.getConf()).getFileSystem(context.getConf());
-      Path stagingDir = QueryMasterTask.initStagingDir(context.getConf(), queryId.toString(), queryContext);
-      Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+      URI finalOutputUri = insertNode.getUri();
+      Tablespace space = TablespaceManager.get(finalOutputUri).get();
+      TableMeta tableMeta = new TableMeta(insertNode.getStorageType(), insertNode.getOptions());
+      tableMeta.putOption(StorageConstants.INSERT_DIRECTLY, Boolean.TRUE.toString());
 
-      TableDesc tableDesc = null;
-      Path finalOutputDir;
-      if (insertNode.getTableName() != null) {
-        tableDesc = this.catalog.getTableDesc(insertNode.getTableName());
-        finalOutputDir = new Path(tableDesc.getUri());
-      } else {
-        finalOutputDir = new Path(insertNode.getUri());
-      }
+      FormatProperty formatProperty = space.getFormatProperty(tableMeta);
 
-      TaskAttemptContext taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
-      taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
+      TaskAttemptContext taskAttemptContext;
+      if (formatProperty.directInsertSupported()) { // if this format and storage supports direct insertion
+        taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, null);
+        taskAttemptContext.setOutputPath(new Path(finalOutputUri));
 
-      EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
-      StoreTableExec exec = new StoreTableExec(taskAttemptContext, insertNode, evalExprExec);
-      try {
-        exec.init();
-        exec.next();
-      } finally {
-        exec.close();
-      }
+        EvalExprExec evalExprExec = new EvalExprExec(taskAttemptContext, (EvalExprNode) insertNode.getChild());
+        InsertRowsExec exec = new InsertRowsExec(taskAttemptContext, insertNode, evalExprExec);
 
-      if (insertNode.isOverwrite()) { // INSERT OVERWRITE INTO
-        // it moves the original table into the temporary location.
-        // Then it moves the new result table into the original table location.
-        // Upon failed, it recovers the original table if possible.
-        boolean movedToOldTable = false;
-        boolean committed = false;
-        Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
         try {
-          if (fs.exists(finalOutputDir)) {
-            fs.rename(finalOutputDir, oldTableDir);
-            movedToOldTable = fs.exists(oldTableDir);
-          } else { // if the parent does not exist, make its parent directory.
-            fs.mkdirs(finalOutputDir.getParent());
-          }
-          fs.rename(stagingResultDir, finalOutputDir);
-          committed = fs.exists(finalOutputDir);
-        } catch (IOException ioe) {
-          // recover the old table
-          if (movedToOldTable && !committed) {
-            fs.rename(oldTableDir, finalOutputDir);
-          }
+          exec.init();
+          exec.next();
+        } finally {
+          exec.close();
         }
       } else {
-        FileStatus[] files = fs.listStatus(stagingResultDir);
-        for (FileStatus eachFile : files) {
-          Path targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName());
-          if (fs.exists(targetFilePath)) {
-            targetFilePath = new Path(finalOutputDir, eachFile.getPath().getName() + "_" + System.currentTimeMillis());
-          }
-          fs.rename(eachFile.getPath(), targetFilePath);
-        }
+        URI stagingSpaceUri = space.prepareStagingSpace(context.getConf(), queryId, queryContext, tableMeta);
+        Path stagingDir = new Path(stagingSpaceUri);
+        Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
+
+        taskAttemptContext = new TaskAttemptContext(queryContext, null, null, null, stagingDir);
+        taskAttemptContext.setOutputPath(new Path(stagingResultDir, "part-01-000000"));
+        insertRowsThroughStaging(taskAttemptContext, insertNode, new Path(finalOutputUri), stagingDir, stagingResultDir);
       }
 
-      if (insertNode.hasTargetTable()) {
-        TableStats stats = tableDesc.getStats();
-        long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
-        stats.setNumBytes(volume);
-        stats.setNumRows(1);
+      // set insert stats (how many rows and bytes)
+      TableStats stats = new TableStats();
+      stats.setNumBytes(taskAttemptContext.getResultStats().getNumBytes());
+      stats.setNumRows(taskAttemptContext.getResultStats().getNumRows());
 
+      if (insertNode.hasTargetTable()) {
         CatalogProtos.UpdateTableStatsProto.Builder builder = CatalogProtos.UpdateTableStatsProto.newBuilder();
-        builder.setTableName(tableDesc.getName());
+        builder.setTableName(insertNode.getTableName());
         builder.setStats(stats.getProto());
 
         catalog.updateTableStats(builder.build());
 
-        responseBuilder.setTableDesc(tableDesc.getProto());
-      } else {
-        TableStats stats = new TableStats();
-        long volume = Query.getTableVolume(context.getConf(), finalOutputDir);
-        stats.setNumBytes(volume);
-        stats.setNumRows(1);
+        TableDesc desc = new TableDesc(
+            insertNode.getTableName(),
+            insertNode.getTargetSchema(),
+            tableMeta,
+            finalOutputUri);
+        responseBuilder.setTableDesc(desc.getProto());
+
+      } else { // If INSERT INTO LOCATION
 
         // Empty TableDesc
         List<CatalogProtos.ColumnProto> columns = new ArrayList<CatalogProtos.ColumnProto>();
@@ -445,11 +479,12 @@ public class QueryExecutor {
     TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, plan.getRootBlock().getRoot());
     if (tableDesc != null) {
 
-      Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
-      StorageProperty storageProperty = space.getProperty();
+      Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
+      FormatProperty formatProperty = space.getFormatProperty(tableDesc.getMeta());
 
-      if (!storageProperty.isInsertable()) {
-        throw new VerifyException("Inserting into non-file storage is not supported.");
+      if (!formatProperty.isInsertable()) {
+        throw new VerifyException(
+            String.format("%s tablespace does not allow INSERT operation.", tableDesc.getUri().toString()));
       }
 
       space.prepareTable(rootNode.getChild());
@@ -487,7 +522,7 @@ public class QueryExecutor {
     TableDesc tableDesc = PlannerUtil.getTableDesc(planner.getCatalog(), rootNode.getChild());
 
     if (tableDesc != null) {
-      Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+      Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
       space.rewritePlan(context, plan);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
index d490001..0c02b6e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
@@ -24,8 +24,6 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.CreateTableNode;
 import org.apache.tajo.plan.logical.LogicalRootNode;
 import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.storage.TableSpaceManager;
-import org.apache.tajo.storage.Tablespace;
 
 public class CreateTableHook implements DistributedQueryHook {
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 4fef02c..9d5838d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -48,7 +48,7 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.util.history.QueryHistory;
@@ -447,7 +447,7 @@ public class Query implements EventHandler<QueryEvent> {
       QueryContext context = query.context.getQueryContext();
 
       if (lastStage != null && context.hasOutputTableUri()) {
-        Tablespace space = TableSpaceManager.get(context.getOutputTableUri()).get();
+        Tablespace space = TablespaceManager.get(context.getOutputTableUri()).get();
         try {
           LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
           space.rollbackTable(rootNode.getChild());
@@ -470,7 +470,7 @@ public class Query implements EventHandler<QueryEvent> {
 
         // If there is not tabledesc, it is a select query without insert or ctas.
         // In this case, we should use default tablespace.
-        Tablespace space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+        Tablespace space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
 
         Path finalOutputDir = space.commitTable(
             query.context.getQueryContext(),

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 84f2eac..1f5e7a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -18,15 +18,10 @@
 
 package org.apache.tajo.querymaster;
 
-import com.google.common.base.Optional;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -61,6 +56,7 @@ import org.apache.tajo.worker.AbstractResourceAllocator;
 import org.apache.tajo.worker.TajoResourceAllocator;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -70,12 +66,6 @@ import static org.apache.tajo.TajoProtos.QueryState;
 public class QueryMasterTask extends CompositeService {
   private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
 
-  // query submission directory is private!
-  final public static FsPermission STAGING_DIR_PERMISSION =
-      FsPermission.createImmutable((short) 0700); // rwx--------
-
-  public static final String TMP_STAGING_DIR_PREFIX = ".staging";
-
   private QueryId queryId;
 
   private Session session;
@@ -157,8 +147,6 @@ public class QueryMasterTask extends CompositeService {
       dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
       dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
 
-      initStagingDir();
-
       queryMetrics = new TajoMetrics(queryId.toString());
 
       super.init(systemConf);
@@ -303,8 +291,9 @@ public class QueryMasterTask extends CompositeService {
         state == QueryState.QUERY_ERROR;
   }
 
+  private LogicalPlan plan;
+
   public synchronized void startQuery() {
-    LogicalPlan plan = null;
     Tablespace space = null;
     try {
       if (query != null) {
@@ -314,7 +303,7 @@ public class QueryMasterTask extends CompositeService {
 
 
       CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
-      LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+      LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
       LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
       Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
       jsonExpr = null; // remove the possible OOM
@@ -322,10 +311,12 @@ public class QueryMasterTask extends CompositeService {
       plan = planner.createPlan(queryContext, expr);
       optimizer.optimize(queryContext, plan);
 
-      // when a given uri is null, TableSpaceManager.get will return the default tablespace.
-      space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+      // when a given uri is null, TablespaceManager.get will return the default tablespace.
+      space = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
       space.rewritePlan(queryContext, plan);
 
+      initStagingDir();
+
       for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
         LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
         if (scanNodes != null) {
@@ -367,94 +358,25 @@ public class QueryMasterTask extends CompositeService {
   }
 
   private void initStagingDir() throws IOException {
-    Path stagingDir;
+    URI stagingDir;
 
     try {
+      Tablespace tablespace = TablespaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+      TableDesc desc = PlannerUtil.getOutputTableDesc(plan);
 
-      stagingDir = initStagingDir(systemConf, queryId.toString(), queryContext);
+      FormatProperty formatProperty = tablespace.getFormatProperty(desc.getMeta());
+      if (formatProperty.isStagingSupport()) {
+        stagingDir = tablespace.prepareStagingSpace(systemConf, queryId.toString(), queryContext, desc.getMeta());
 
-      // Create a subdirectories
-      LOG.info("The staging dir '" + stagingDir + "' is created.");
-      queryContext.setStagingDir(stagingDir);
-    } catch (IOException ioe) {
-      LOG.warn("Creating staging dir has been failed.", ioe);
-
-      throw ioe;
-    }
-  }
-
-  /**
-   * It initializes the final output and staging directory and sets
-   * them to variables.
-   */
-  public static Path initStagingDir(TajoConf conf, String queryId, QueryContext context) throws IOException {
-
-    String realUser;
-    String currentUser;
-    UserGroupInformation ugi;
-    ugi = UserGroupInformation.getLoginUser();
-    realUser = ugi.getShortUserName();
-    currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
-
-    FileSystem fs;
-    Path stagingDir;
-
-    ////////////////////////////////////////////
-    // Create Output Directory
-    ////////////////////////////////////////////
-
-    String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, "");
-
-    // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO
-    // So, this query results won't be materialized as a part of a table.
-    // The result will be temporarily written in the staging directory.
-    if (outputPath.isEmpty()) {
-      // for temporarily written in the storage directory
-      stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
-    } else {
-      Optional<Tablespace> spaceResult = TableSpaceManager.get(outputPath);
-      if (!spaceResult.isPresent()) {
-        throw new IOException("No registered Tablespace for " + outputPath);
+        // Create a staging space
+        LOG.info("The staging dir '" + stagingDir + "' is created.");
+        queryContext.setStagingDir(stagingDir);
       }
 
-      Tablespace space = spaceResult.get();
-      if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation
-        // If this space allows move operation, the staging directory will be underneath the final output table uri.
-        stagingDir = StorageUtil.concatPath(context.getOutputTableUri().toString(), TMP_STAGING_DIR_PREFIX, queryId);
-      } else {
-        stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
-      }
-    }
-
-    // initializ
-    fs = stagingDir.getFileSystem(conf);
-
-    if (fs.exists(stagingDir)) {
-      throw new IOException("The staging directory '" + stagingDir + "' already exists");
-    }
-    fs.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
-    FileStatus fsStatus = fs.getFileStatus(stagingDir);
-    String owner = fsStatus.getOwner();
-
-    if (!owner.isEmpty() && !(owner.equals(currentUser) || owner.equals(realUser))) {
-      throw new IOException("The ownership on the user's query " +
-          "directory " + stagingDir + " is not as expected. " +
-          "It is owned by " + owner + ". The directory must " +
-          "be owned by the submitter " + currentUser + " or " +
-          "by " + realUser);
-    }
-
-    if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
-      LOG.info("Permissions on staging directory " + stagingDir + " are " +
-          "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
-          "to correct value " + STAGING_DIR_PERMISSION);
-      fs.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+    } catch (IOException ioe) {
+      LOG.warn("Creating staging space has been failed.", ioe);
+      throw ioe;
     }
-
-    Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME);
-    fs.mkdirs(stagingResultDir);
-
-    return stagingDir;
   }
 
   public Query getQuery() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 5b8f24a..f30fb64 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -100,7 +100,7 @@ public class Repartitioner {
           stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes();
         }
 
-        // TODO - We should remove dummy flagment usages
+        // TODO - We should remove dummy fragment usages
         fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path("/dummy"), 0, 0,
             new String[]{UNKNOWN_HOST});
 
@@ -115,7 +115,7 @@ public class Repartitioner {
         // if table has no data, tablespace will return empty FileFragment.
         // So, we need to handle FileFragment by its size.
         // If we don't check its size, it can cause IndexOutOfBoundsException.
-        Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+        Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
         List<Fragment> fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc);
         if (fileFragments.size() > 0) {
           fragments[i] = fileFragments.get(0);
@@ -380,7 +380,7 @@ public class Repartitioner {
 
         Path[] partitionScanPaths = null;
         TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName());
-        Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+        Tablespace space = TablespaceManager.get(tableDesc.getUri()).get();
 
         if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
 
@@ -507,7 +507,7 @@ public class Repartitioner {
       Collection<Fragment> scanFragments;
       Path[] partitionScanPaths = null;
 
-      FileTablespace space = (FileTablespace) TableSpaceManager.get(desc.getUri()).get();
+      FileTablespace space = (FileTablespace) TablespaceManager.get(desc.getUri()).get();
 
       if (scan.getType() == NodeType.PARTITIONS_SCAN) {
         PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan;
@@ -645,7 +645,7 @@ public class Repartitioner {
             PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
       }
 
-      Tablespace space = TableSpaceManager.getAnyByScheme(storeType).get();
+      Tablespace space = TablespaceManager.getAnyByScheme(storeType).get();
       ranges = space.getInsertSortRanges(
           stage.getContext().getQueryContext(),
           tableDesc,

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index a7d605c..1163a6e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -60,7 +60,7 @@ import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.querymaster.Task.IntermediateEntry;
 import org.apache.tajo.storage.FileTablespace;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
@@ -1084,7 +1084,7 @@ public class Stage implements EventHandler<StageEvent> {
       Collection<Fragment> fragments;
       TableMeta meta = table.getMeta();
 
-      Tablespace tablespace = TableSpaceManager.get(scan.getTableDesc().getUri()).get();
+      Tablespace tablespace = TablespaceManager.get(scan.getTableDesc().getUri()).get();
 
       // Depending on scanner node's type, it creates fragments. If scan is for
       // a partitioned table, It will creates lots fragments for all partitions.

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
index 0df5d4d..f97ce29 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
@@ -160,7 +160,7 @@ public class LegacyTaskImpl implements Task {
         this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
       }
     } else {
-      Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get())
+      Path outFilePath = ((FileTablespace) TablespaceManager.get(queryContext.getStagingDir().toUri()).get())
           .getAppenderFilePath(getId(), queryContext.getStagingDir());
       LOG.info("Output File Path: " + outFilePath);
       context.setOutputPath(outFilePath);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index 5974693..7697458 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -154,7 +154,7 @@ public class TaskImpl implements Task {
         this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
       }
     } else {
-      Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get())
+      Path outFilePath = ((FileTablespace) TablespaceManager.get(queryContext.getStagingDir().toUri()).get())
           .getAppenderFilePath(getId(), queryContext.getStagingDir());
       LOG.info("Output File Path: " + outFilePath);
       context.setOutputPath(outFilePath);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 43bb6c1..bd84283 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -27,7 +27,7 @@
 <%@ page import="org.apache.tajo.master.rm.WorkerState" %>
 <%@ page import="org.apache.tajo.service.ServiceTracker" %>
 <%@ page import="org.apache.tajo.service.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.storage.TableSpaceManager" %>
+<%@ page import="org.apache.tajo.storage.TablespaceManager" %>
 <%@ page import="org.apache.tajo.storage.Tablespace" %>
 <%@ page import="org.apache.tajo.util.NetUtils" %>
 <%@ page import="org.apache.tajo.util.TUtil" %>
@@ -141,7 +141,7 @@
   <h3>Tablespaces</h3>
   <table width="100%" class="border_table" border="1">
     <tr><th>Tablespace Name</th><th>URI</th><th>Handler</th></tr>
-    <% for (Tablespace space : TableSpaceManager.getAllTablespaces()) {
+    <% for (Tablespace space : TablespaceManager.getAllTablespaces()) {
       if (space.isVisible()) { %>
     <tr><td><%=space.getName()%></td><td><%=space.getUri()%></td><td><%=space.getClass().getName()%></td></tr>
     <% }}%>

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
index ca2378b..5df1122 100644
--- a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -46,7 +46,7 @@ public class BackendTestingUtil {
 
   public static void writeTmpTable(TajoConf conf, Path tablePath)
       throws IOException {
-    FileTablespace sm = TableSpaceManager.getDefault();
+    FileTablespace sm = TablespaceManager.getDefault();
     Appender appender;
 
     Path filePath = new Path(tablePath, "table.csv");

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 57b1e18..a323f25 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -348,6 +348,41 @@ public class QueryTestCaseBase {
   }
 
   /**
+   * It executes the query file and compare the result against the the result file.
+   *
+   * @throws Exception
+   */
+  public void assertQuery() throws Exception {
+    ResultSet res = null;
+    try {
+      res = executeQuery();
+      assertResultSet(res);
+    } finally {
+      if (res != null) {
+        res.close();
+      }
+    }
+  }
+
+  /**
+   * It executes a given query statement and verifies the result against the the result file.
+   *
+   * @param query A query statement
+   * @throws Exception
+   */
+  public void assertQueryStr(String query) throws Exception {
+    ResultSet res = null;
+    try {
+      res = executeString(query);
+      assertResultSet(res);
+    } finally {
+      if (res != null) {
+        res.close();
+      }
+    }
+  }
+
+  /**
    * Execute a query contained in the file located in src/test/resources/results/<i>ClassName</i>/<i>MethodName</i>.
    * <i>ClassName</i> and <i>MethodName</i> will be replaced by actual executed class and methods.
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index acdae85..973f1e8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -48,7 +48,7 @@ import org.apache.tajo.querymaster.Stage;
 import org.apache.tajo.querymaster.StageState;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.storage.FileTablespace;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
@@ -59,6 +59,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Writer;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.List;
@@ -346,18 +347,17 @@ public class TajoTestingCluster {
     LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
 
     if (!local) {
-      c.setVar(ConfVars.ROOT_DIR, getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
-    } else {
-      c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo");
-    }
+      String tajoRootDir = getMiniDFSCluster().getFileSystem().getUri().toString() + "/tajo";
+      c.setVar(ConfVars.ROOT_DIR, tajoRootDir);
 
-    // Do not need for local file system
-    if (!local) {
+      URI defaultTsUri = TajoConf.getWarehouseDir(c).toUri();
       FileTablespace defaultTableSpace =
-          new FileTablespace(TableSpaceManager.DEFAULT_TABLESPACE_NAME, TajoConf.getWarehouseDir(c).toUri());
+          new FileTablespace(TablespaceManager.DEFAULT_TABLESPACE_NAME, defaultTsUri);
       defaultTableSpace.init(conf);
+      TablespaceManager.addTableSpaceForTest(defaultTableSpace);
 
-      TableSpaceManager.addTableSpaceForTest(defaultTableSpace);
+    } else {
+      c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo");
     }
 
     setupCatalogForTesting(c, testBuildDir);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
index ce951c6..26e25a4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
@@ -32,7 +32,7 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.client.QueryStatus;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.FileUtil;
 import org.junit.After;
 import org.junit.Before;
@@ -217,7 +217,7 @@ public class TestTajoCli {
 
     if (!cluster.isHiveCatalogStoreRunning()) {
       assertOutputResult(resultFileName, consoleResult, new String[]{"${table.path}"},
-        new String[]{TableSpaceManager.getDefault().getTableUri("default", tableName).toString()});
+        new String[]{TablespaceManager.getDefault().getTableUri("default", tableName).toString()});
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 328f883..07a09ad 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -24,7 +24,6 @@ import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.cli.tsql.InvalidStatementException;
 import org.apache.tajo.cli.tsql.ParsedResult;
 import org.apache.tajo.cli.tsql.SimpleParser;
@@ -50,7 +49,7 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
 import org.apache.tajo.plan.verifier.VerificationState;
 import org.apache.tajo.storage.LazyTuple;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.BytesUtils;
@@ -104,7 +103,7 @@ public class ExprTestBase {
 
     analyzer = new SQLAnalyzer();
     preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat);
-    planner = new LogicalPlanner(cat, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(cat, TablespaceManager.getInstance());
     optimizer = new LogicalOptimizer(util.getConfiguration());
     annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 80f3459..5a8238c 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -27,7 +27,6 @@ import org.apache.tajo.algebra.OpType;
 import org.apache.tajo.algebra.Selection;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
@@ -45,7 +44,7 @@ import org.apache.tajo.plan.function.GeneralFunction;
 import org.apache.tajo.plan.logical.GroupbyNode;
 import org.apache.tajo.plan.logical.NodeType;
 import org.apache.tajo.plan.nameresolver.NameResolvingMode;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.AfterClass;
@@ -117,7 +116,7 @@ public class TestEvalTreeUtil {
     catalog.createFunction(funcMeta);
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 
     String[] QUERIES = {
         "select name, score, age from people where score > 30", // 0

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 9aa7ddf..afa3472 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -34,7 +34,7 @@ import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.junit.AfterClass;
@@ -103,7 +103,7 @@ public class TestLogicalOptimizer {
 
     catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     optimizer = new LogicalOptimizer(util.getConfiguration());
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index 3cee816..dc9e2b0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.engine.planner;
 
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.graph.SimpleDirectedGraph;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
@@ -40,7 +40,7 @@ public class TestLogicalPlan {
   public static void setup() throws Exception {
     util = new TajoTestingCluster();
     util.startCatalogCluster();
-    planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog(), TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog(), TablespaceManager.getInstance());
   }
 
   public static void tearDown() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 351a6af..0f37763 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -42,7 +42,7 @@ import org.apache.tajo.plan.*;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.KeyValueSet;
@@ -131,7 +131,7 @@ public class TestLogicalPlanner {
 
     catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index d62eed2..fb35220 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -39,7 +39,7 @@ import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
@@ -109,7 +109,7 @@ public class TestPlannerUtil {
 
     catalog.createFunction(funcDesc);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 2464fb1..ace3d0d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -86,7 +86,7 @@ public class TestBNLJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(employeeMeta, schema, employeePath);
     appender.init();
     VTuple tuple = new VTuple(schema.size());
@@ -108,7 +108,7 @@ public class TestBNLJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
@@ -124,7 +124,7 @@ public class TestBNLJoinExec {
     people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
     catalog.createTable(people);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 96a1f36..b4a6063 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -90,7 +90,7 @@ public class TestBSTIndexExec {
     Path workDir = CommonTestingUtil.getTestDir();
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
-    sm = TableSpaceManager.getLocalFs();
+    sm = TablespaceManager.getLocalFs();
 
     idxPath = new Path(workDir, "test.idx");
 
@@ -148,7 +148,7 @@ public class TestBSTIndexExec {
     catalog.createTable(desc);
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     optimizer = new LogicalOptimizer(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index d94d3f6..cf5220e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -82,7 +82,7 @@ public class TestExternalSortExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
     appender.init();
@@ -104,7 +104,7 @@ public class TestExternalSortExec {
     employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri());
     catalog.createTable(employee);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index 21a101a..dc4dd04 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -104,7 +104,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     VTuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -133,7 +133,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     VTuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -172,7 +172,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     VTuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -224,7 +224,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
 
@@ -234,7 +234,7 @@ public class TestFullOuterHashJoinExec {
     catalog.createTable(phone3);
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index 0e2ce42..8fd61d0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -109,7 +109,7 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     VTuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -147,7 +147,7 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta dep4Meta = CatalogUtil.newTableMeta("CSV");
     Path dep4Path = new Path(testDir, "dep4.csv");
-    Appender appender4 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep4Meta, dep4Schema, dep4Path);
+    Appender appender4 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep4Meta, dep4Schema, dep4Path);
     appender4.init();
     VTuple tuple4 = new VTuple(dep4Schema.size());
     for (int i = 0; i < 11; i++) {
@@ -178,7 +178,7 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     VTuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -217,7 +217,7 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     VTuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -269,7 +269,7 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
     appender5.flush();
@@ -278,7 +278,7 @@ public class TestFullOuterMergeJoinExec {
     catalog.createTable(phone3);
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index d54df1c..1b64a8f 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -83,7 +83,7 @@ public class TestHashAntiJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(employeeMeta, employeeSchema, employeePath);
     appender.init();
     VTuple tuple = new VTuple(employeeSchema.size());
@@ -109,7 +109,7 @@ public class TestHashAntiJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
@@ -128,7 +128,7 @@ public class TestHashAntiJoinExec {
     people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
     catalog.createTable(people);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     optimizer = new LogicalOptimizer(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index a8826ee..b9ee06a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -85,7 +85,7 @@ public class TestHashJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(employeeMeta, employeeSchema, employeePath);
     appender.init();
     VTuple tuple = new VTuple(employeeSchema.size());
@@ -108,7 +108,7 @@ public class TestHashJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
@@ -126,7 +126,7 @@ public class TestHashJoinExec {
     people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
     catalog.createTable(people);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
   }
 


[2/3] tajo git commit: TAJO-1616: Implement TablespaceManager to load Tablespaces. (missed commits)

Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index ae90502..afa273b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -84,7 +84,7 @@ public class TestHashSemiJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(employeeMeta, employeeSchema, employeePath);
     appender.init();
     VTuple tuple = new VTuple(employeeSchema.size());
@@ -110,7 +110,7 @@ public class TestHashSemiJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
@@ -133,7 +133,7 @@ public class TestHashSemiJoinExec {
     people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
     catalog.createTable(people);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     optimizer = new LogicalOptimizer(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index bbb441c..c93a1b4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -105,7 +105,7 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     VTuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -134,7 +134,7 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     VTuple tuple2 = new VTuple(job3Schema.size());
@@ -174,7 +174,7 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     VTuple tuple3 = new VTuple(emp3Schema.size());
@@ -227,7 +227,7 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
     
@@ -239,7 +239,7 @@ public class TestLeftOuterHashJoinExec {
 
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index d0d0983..c4e7752 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -85,7 +85,7 @@ public class TestMergeJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(employeeMeta, employeeSchema, employeePath);
     appender.init();
     VTuple tuple = new VTuple(employeeSchema.size());
@@ -114,7 +114,7 @@ public class TestMergeJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
@@ -139,7 +139,7 @@ public class TestMergeJoinExec {
     people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
     catalog.createTable(people);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 4866323..1b30ef8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -83,7 +83,7 @@ public class TestNLJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(employeeMeta, schema, employeePath);
     appender.init();
     VTuple tuple = new VTuple(schema.size());
@@ -107,7 +107,7 @@ public class TestNLJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
@@ -125,7 +125,7 @@ public class TestNLJoinExec {
     people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
     catalog.createTable(people);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 
     masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index b2a228a..dff0cbe 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -98,7 +98,7 @@ public class TestPhysicalPlanner {
     util.startCatalogCluster();
     conf = util.getConfiguration();
     testDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestPhysicalPlanner");
-    sm = TableSpaceManager.getLocalFs();
+    sm = TablespaceManager.getLocalFs();
     catalog = util.getMiniCatalogCluster().getCatalog();
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
@@ -164,7 +164,7 @@ public class TestPhysicalPlanner {
     appender.close();
     catalog.createTable(score);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     optimizer = new LogicalOptimizer(conf);
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
@@ -180,7 +180,7 @@ public class TestPhysicalPlanner {
 
     Schema scoreSchmea = score.getSchema();
     TableMeta scoreLargeMeta = CatalogUtil.newTableMeta("RAW", new KeyValueSet());
-    Appender appender =  ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender =  ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(scoreLargeMeta, scoreSchmea, scoreLargePath);
     appender.enableStats();
     appender.init();
@@ -442,7 +442,7 @@ public class TestPhysicalPlanner {
     exec.next();
     exec.close();
 
-    Scanner scanner =  ((FileTablespace) TableSpaceManager.getLocalFs())
+    Scanner scanner =  ((FileTablespace) TablespaceManager.getLocalFs())
         .getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
@@ -502,7 +502,7 @@ public class TestPhysicalPlanner {
     // checking the file contents
     long totalNum = 0;
     for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) {
-      Scanner scanner =  ((FileTablespace) TableSpaceManager.getLocalFs()).getFileScanner(
+      Scanner scanner =  ((FileTablespace) TablespaceManager.getLocalFs()).getFileScanner(
           CatalogUtil.newTableMeta("CSV"),
           rootNode.getOutSchema(),
           status.getPath());
@@ -539,7 +539,7 @@ public class TestPhysicalPlanner {
     exec.next();
     exec.close();
 
-    Scanner scanner = ((FileTablespace) TableSpaceManager.getLocalFs()).getFileScanner(
+    Scanner scanner = ((FileTablespace) TablespaceManager.getLocalFs()).getFileScanner(
         outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
     scanner.init();
     Tuple tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index 1b54948..d1da787 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -86,7 +86,7 @@ public class TestProgressExternalSortExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta("RAW");
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
     appender.init();
@@ -110,7 +110,7 @@ public class TestProgressExternalSortExec {
         employeePath.toUri());
     catalog.createTable(employee);
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index c956f29..f581db8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -100,7 +100,7 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     VTuple tuple = new VTuple(dep3Schema.size());
@@ -130,7 +130,7 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     VTuple tuple2 = new VTuple(job3Schema.size());
@@ -170,7 +170,7 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     VTuple tuple3 = new VTuple(emp3Schema.size());
@@ -212,7 +212,7 @@ public class TestRightOuterHashJoinExec {
     catalog.createTable(emp3);
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index 25f0ca4..d86b229 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -107,7 +107,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender1 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     VTuple tuple = new VTuple(dep3Schema.size());
@@ -146,7 +146,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta dep4Meta = CatalogUtil.newTableMeta("CSV");
     Path dep4Path = new Path(testDir, "dep4.csv");
-    Appender appender4 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender4 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(dep4Meta, dep4Schema, dep4Path);
     appender4.init();
     VTuple tuple4 = new VTuple(dep4Schema.size());
@@ -178,7 +178,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender2 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     VTuple tuple2 = new VTuple(job3Schema.size());
@@ -218,7 +218,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender3 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     VTuple tuple3 = new VTuple(emp3Schema.size());
@@ -271,7 +271,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender5 = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
 
@@ -281,7 +281,7 @@ public class TestRightOuterMergeJoinExec {
     catalog.createTable(phone3);
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index ce12faf..4690e71 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -69,7 +69,7 @@ public class TestSortExec {
     util = TpchTestBase.getInstance().getTestingCluster();
     catalog = util.getMaster().getCatalog();
     workDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    sm = TableSpaceManager.getLocalFs();
+    sm = TablespaceManager.getLocalFs();
 
     Schema schema = new Schema();
     schema.addColumn("managerid", Type.INT4);
@@ -81,7 +81,7 @@ public class TestSortExec {
     tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
     sm.getFileSystem().mkdirs(tablePath.getParent());
 
-    Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
+    Appender appender = ((FileTablespace) TablespaceManager.getLocalFs())
         .getAppender(employeeMeta, schema, tablePath);
     appender.init();
     VTuple tuple = new VTuple(schema.size());
@@ -101,7 +101,7 @@ public class TestSortExec {
     catalog.createTable(desc);
 
     analyzer = new SQLAnalyzer();
-    planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     optimizer = new LogicalOptimizer(conf);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index 3d2d857..569111c 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@ -38,7 +38,7 @@ import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.plan.expr.*;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.hbase.*;
@@ -82,7 +82,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
     tableSpaceUri = "hbase:zk://" + hostName + ":" + zkPort;
     HBaseTablespace hBaseTablespace = new HBaseTablespace("cluster1", URI.create(tableSpaceUri));
     hBaseTablespace.init(new TajoConf(testingCluster.getHBaseUtil().getConf()));
-    TableSpaceManager.addTableSpaceForTest(hBaseTablespace);
+    TablespaceManager.addTableSpaceForTest(hBaseTablespace);
   }
 
   @AfterClass
@@ -213,7 +213,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
     assertTableExists("external_hbase_mapped_table");
 
-    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
     HConnection hconn = space.getConnection();
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
@@ -253,7 +253,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
     assertTableExists("external_hbase_mapped_table");
 
-    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
     HConnection hconn = space.getConnection();
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
@@ -306,7 +306,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
     assertTableExists("external_hbase_mapped_table");
 
-    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
     HConnection hconn = space.getConnection();
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
@@ -343,7 +343,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
 
     assertTableExists("external_hbase_mapped_table");
 
-    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.getByName("cluster1").get();
+    HBaseTablespace space = (HBaseTablespace) TablespaceManager.getByName("cluster1").get();
     HConnection hconn = space.getConnection();
     HTableInterface htable = hconn.getTable("external_hbase_table");
 
@@ -477,7 +477,7 @@ public class TestHBaseTable extends QueryTestCaseBase {
     EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
         new ConstEval(new TextDatum("021")));
     scanNode.setQual(evalNodeEq);
-    Tablespace tablespace = TableSpaceManager.getByName("cluster1").get();
+    Tablespace tablespace = TablespaceManager.getByName("cluster1").get();
     List<Fragment> fragments = tablespace.getSplits("hbase_mapped_table", tableDesc, scanNode);
     assertEquals(1, fragments.size());
     assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
@@ -683,6 +683,48 @@ public class TestHBaseTable extends QueryTestCaseBase {
   }
 
   @Test
+  public void testInsertValues1() throws Exception {
+    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " +
+        "TABLESPACE cluster1 USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b')").close();
+
+    assertTableExists("hbase_mapped_table");
+    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
+
+    executeString("insert into hbase_mapped_table select 'aaa', 'a12', 'a34', 1").close();
+    executeString("insert into hbase_mapped_table select 'bbb', 'b12', 'b34', 2").close();
+    executeString("insert into hbase_mapped_table select 'ccc', 'c12', 'c34', 3").close();
+    executeString("insert into hbase_mapped_table select 'ddd', 'd12', 'd34', 4").close();
+
+    HTable htable = null;
+    ResultScanner scanner = null;
+    try {
+      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
+
+      Scan scan = new Scan();
+      scan.addFamily(Bytes.toBytes("col1"));
+      scan.addFamily(Bytes.toBytes("col2"));
+      scan.addFamily(Bytes.toBytes("col3"));
+      scanner = htable.getScanner(scan);
+
+      assertStrings(resultSetToString(scanner,
+          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
+          new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
+          new boolean[]{false, false, false, true}, tableDesc.getSchema()));
+
+    } finally {
+      executeString("DROP TABLE hbase_mapped_table PURGE").close();
+
+      if (scanner != null) {
+        scanner.close();
+      }
+
+      if (htable != null) {
+        htable.close();
+      }
+    }
+  }
+
+  @Test
   public void testInsertIntoMultiRegion() throws Exception {
     executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) TABLESPACE cluster1 " +
         "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
@@ -1301,10 +1343,8 @@ public class TestHBaseTable extends QueryTestCaseBase {
     }
   }
 
-  private String resultSetToString(ResultScanner scanner,
-                                   byte[][] cfNames, byte[][] qualifiers,
-                                   boolean[] binaries,
-                                   Schema schema) throws Exception {
+  private String resultSetToString(ResultScanner scanner, byte[][] cfNames, byte[][] qualifiers,
+                                   boolean [] binaries, Schema schema) throws Exception {
     StringBuilder sb = new StringBuilder();
     Result result = null;
     while ( (result = scanner.next()) != null ) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 1478690..dd67e06 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -264,7 +264,7 @@ public class TestJoinQuery extends QueryTestCaseBase {
         }
         Path dataPath = new Path(table.getUri().toString(), fileIndex + ".csv");
         fileIndex++;
-        appender = (((FileTablespace)TableSpaceManager.getLocalFs()))
+        appender = (((FileTablespace) TablespaceManager.getLocalFs()))
             .getAppender(tableMeta, schema, dataPath);
         appender.init();
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
index c714749..265f075 100644
--- a/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java
@@ -20,10 +20,13 @@ package org.apache.tajo.ha;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
 import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.service.ServiceTracker;
@@ -32,58 +35,51 @@ import org.junit.Test;
 
 import static junit.framework.Assert.assertTrue;
 import static junit.framework.TestCase.assertEquals;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotEquals;
 
 public class TestHAServiceHDFSImpl  {
   private TajoTestingCluster cluster;
-  private TajoMaster backupMaster;
 
-  private TajoConf conf;
-  private TajoClient client;
+  private TajoMaster primaryMaster;
+  private TajoMaster backupMaster;
 
   private Path haPath, activePath, backupPath;
 
-  private String masterAddress;
-
   @Test
   public final void testAutoFailOver() throws Exception {
-    cluster = new TajoTestingCluster(true);
-
-    cluster.startMiniCluster(1);
-    conf = cluster.getConfiguration();
-    client = cluster.newTajoClient();
+    cluster = TpchTestBase.getInstance().getTestingCluster();
 
     try {
       FileSystem fs = cluster.getDefaultFileSystem();
 
-      ServiceTracker serviceTracker = ServiceTrackerFactory.get(conf);
-      masterAddress = serviceTracker.getUmbilicalAddress().getHostName();
-
-      setConfiguration();
+      TajoConf primaryConf = setConfigForHAMaster();
+      primaryMaster = new TajoMaster();
+      primaryMaster.init(primaryConf);
+      primaryMaster.start();
 
+      TajoConf backupConf = setConfigForHAMaster();
       backupMaster = new TajoMaster();
-      backupMaster.init(conf);
+      backupMaster.init(backupConf);
       backupMaster.start();
 
-      assertNotEquals(cluster.getMaster().getMasterName(), backupMaster.getMasterName());
+      ServiceTracker tracker = ServiceTrackerFactory.get(primaryConf);
 
+      assertNotEquals(primaryMaster.getMasterName(), backupMaster.getMasterName());
       verifySystemDirectories(fs);
 
       assertEquals(2, fs.listStatus(activePath).length);
       assertEquals(1, fs.listStatus(backupPath).length);
 
       assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE)));
-      assertTrue(fs.exists(new Path(activePath, cluster.getMaster().getMasterName().replaceAll(":", "_"))));
+      assertTrue(fs.exists(new Path(activePath, primaryMaster.getMasterName().replaceAll(":", "_"))));
       assertTrue(fs.exists(new Path(backupPath, backupMaster.getMasterName().replaceAll(":", "_"))));
 
-      createDatabaseAndTable();
-      verifyDataBaseAndTable();
-      client.close();
+      createDatabaseAndTable(tracker);
+      verifyDataBaseAndTable(tracker);
 
-      cluster.getMaster().stop();
+      primaryMaster.stop();
 
-      client = cluster.newTajoClient();
-      verifyDataBaseAndTable();
+      verifyDataBaseAndTable(tracker);
 
       assertEquals(2, fs.listStatus(activePath).length);
       assertEquals(0, fs.listStatus(backupPath).length);
@@ -91,25 +87,23 @@ public class TestHAServiceHDFSImpl  {
       assertTrue(fs.exists(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE)));
       assertTrue(fs.exists(new Path(activePath, backupMaster.getMasterName().replaceAll(":", "_"))));
     } finally {
-      client.close();
       backupMaster.stop();
-      cluster.shutdownMiniCluster();
     }
   }
 
-  private void setConfiguration() {
-    conf = cluster.getConfiguration();
+  private TajoConf setConfigForHAMaster() {
+    TajoConf conf = new TajoConf(cluster.getConfiguration());
 
     conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
-      masterAddress + ":" + NetUtils.getFreeSocketPort());
+        "localhost:" + NetUtils.getFreeSocketPort());
     conf.setVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS,
-      masterAddress + ":" + NetUtils.getFreeSocketPort());
+        "localhost:" + NetUtils.getFreeSocketPort());
     conf.setVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS,
-      masterAddress + ":" + NetUtils.getFreeSocketPort());
+        "localhost:" + NetUtils.getFreeSocketPort());
     conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS,
-      masterAddress + ":" + NetUtils.getFreeSocketPort());
+        "localhost:" + NetUtils.getFreeSocketPort());
     conf.setVar(TajoConf.ConfVars.TAJO_MASTER_INFO_ADDRESS,
-        masterAddress + ":" + NetUtils.getFreeSocketPort());
+        "localhost:" + NetUtils.getFreeSocketPort());
     conf.setIntVar(TajoConf.ConfVars.REST_SERVICE_PORT,
         NetUtils.getFreeSocketPort());
 
@@ -126,6 +120,8 @@ public class TestHAServiceHDFSImpl  {
     conf.setIntVar(TajoConf.ConfVars.WORKER_RPC_SERVER_WORKER_THREAD_NUM, 2);
     conf.setIntVar(TajoConf.ConfVars.CATALOG_RPC_SERVER_WORKER_THREAD_NUM, 2);
     conf.setIntVar(TajoConf.ConfVars.SHUFFLE_RPC_SERVER_WORKER_THREAD_NUM, 2);
+
+    return conf;
   }
 
   private void verifySystemDirectories(FileSystem fs) throws Exception {
@@ -139,14 +135,26 @@ public class TestHAServiceHDFSImpl  {
     assertTrue(fs.exists(backupPath));
   }
 
-  private void createDatabaseAndTable() throws Exception {
-    client.executeQuery("CREATE TABLE default.table1 (age int);");
-    client.executeQuery("CREATE TABLE default.table2 (age int);");
+  private void createDatabaseAndTable(ServiceTracker tracker) throws Exception {
+    TajoClient client = null;
+    try {
+      client = new TajoClientImpl(tracker);
+      client.executeQuery("CREATE TABLE default.ha_test1 (age int);");
+      client.executeQuery("CREATE TABLE default.ha_test2 (age int);");
+    } finally {
+      IOUtils.cleanup(null, client);
+    }
   }
 
-  private void verifyDataBaseAndTable() throws Exception {
-    client.existDatabase("default");
-    client.existTable("default.table1");
-    client.existTable("default.table2");
+  private void verifyDataBaseAndTable(ServiceTracker tracker) throws Exception {
+    TajoClient client = null;
+    try {
+      client = new TajoClientImpl(tracker);
+      client.existDatabase("default");
+      client.existTable("default.ha_test1");
+      client.existTable("default.ha_test2");
+    } finally {
+      IOUtils.cleanup(null, client);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index fc25c27..3d32c08 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -66,7 +66,7 @@ public class TestResultSet {
   public static void setup() throws Exception {
     util = TpchTestBase.getInstance().getTestingCluster();
     conf = util.getConfiguration();
-    sm = TableSpaceManager.getDefault();
+    sm = TablespaceManager.getDefault();
 
     scoreSchema = new Schema();
     scoreSchema.addColumn("deptname", Type.TEXT);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 48966bc..7c61cc7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -34,7 +34,7 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.plan.LogicalOptimizer;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.LogicalPlanner;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -79,7 +79,7 @@ public class TestExecutionBlockCursor {
     }
 
     analyzer = new SQLAnalyzer();
-    logicalPlanner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    logicalPlanner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     optimizer = new LogicalOptimizer(conf);
 
     dispatcher = new AsyncDispatcher();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
index 6322732..e8d59d0 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java
@@ -22,18 +22,17 @@ import static org.junit.Assert.*;
 import static org.hamcrest.CoreMatchers.*;
 
 import java.io.File;
+import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.benchmark.TPCH;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.client.ResultSetUtil;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
@@ -62,256 +61,21 @@ import org.junit.Test;
 
 import com.google.protobuf.ByteString;
 
-public class TestNonForwardQueryResultSystemScanner {
-  
-  private class CollectionMatcher<T> extends TypeSafeDiagnosingMatcher<Iterable<? extends T>> {
-    
-    private final Matcher<? extends T> matcher;
-    
-    public CollectionMatcher(Matcher<? extends T> matcher) {
-      this.matcher = matcher;
-    }
-
-    @Override
-    public void describeTo(Description description) {
-      description.appendText("a collection containing ").appendDescriptionOf(this.matcher);
-    }
-
-    @Override
-    protected boolean matchesSafely(Iterable<? extends T> item, Description mismatchDescription) {
-      boolean isFirst = true;
-      Iterator<? extends T> iterator = item.iterator();
-      
-      while (iterator.hasNext()) {
-        T obj = iterator.next();
-        if (this.matcher.matches(obj)) {
-          return true;
-        }
-        
-        if (!isFirst) {
-          mismatchDescription.appendText(", ");
-        }
-        
-        this.matcher.describeMismatch(obj, mismatchDescription);
-        isFirst = false;
-      }
-      return false;
-    }
-    
-  }
-  
-  private <T> Matcher<Iterable<? extends T>> hasItem(Matcher<? extends T> matcher) {
-    return new CollectionMatcher<T>(matcher);
-  }
-
-  private static LocalTajoTestingUtility testUtil;
-  private static TajoTestingCluster testingCluster;
-  private static TajoConf conf;
-  private static MasterContext masterContext;
-  
-  private static SQLAnalyzer analyzer;
-  private static LogicalPlanner logicalPlanner;
-  private static LogicalOptimizer logicalOptimizer;
-  
-  private static void setupTestingCluster() throws Exception {
-    testUtil = new LocalTajoTestingUtility();
-    String[] names, paths;
-    Schema[] schemas;
-    
-    TPCH tpch = new TPCH();
-    tpch.loadSchemas();
-    tpch.loadQueries();
-    
-    names = new String[] {"customer", "lineitem", "nation", "orders", "part", "partsupp", 
-        "region", "supplier", "empty_orders"};
-    schemas = new Schema[names.length];
-    for (int i = 0; i < names.length; i++) {
-      schemas[i] = tpch.getSchema(names[i]);
-    }
-
-    File file;
-    paths = new String[names.length];
-    for (int i = 0; i < names.length; i++) {
-      file = new File("src/test/tpch/" + names[i] + ".tbl");
-      if(!file.exists()) {
-        file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i]
-            + ".tbl");
-      }
-      paths[i] = file.getAbsolutePath();
-    }
-    
-    KeyValueSet opt = new KeyValueSet();
-    opt.set(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
-    testUtil.setup(names, paths, schemas, opt);
-    
-    testingCluster = testUtil.getTestingCluster();
-  }
-  
-  @BeforeClass
-  public static void setUp() throws Exception {
-    setupTestingCluster();
-    
-    conf = testingCluster.getConfiguration();
-    masterContext = testingCluster.getMaster().getContext();
-    
-    GlobalEngine globalEngine = masterContext.getGlobalEngine();
-    analyzer = globalEngine.getAnalyzer();
-    logicalPlanner = globalEngine.getLogicalPlanner();
-    logicalOptimizer = globalEngine.getLogicalOptimizer();
-  }
-  
-  @AfterClass
-  public static void tearDown() throws Exception {
-    try {
-      Thread.sleep(2000);
-    } catch (Exception ignored) {
-    }
-    
-    testUtil.shutdown();
-  }
-  
-  private NonForwardQueryResultScanner getScanner(String sql) throws Exception {
-    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
-    String sessionId = UUID.randomUUID().toString();
-    
-    return getScanner(sql, queryId, sessionId);
-  }
-  
-  private NonForwardQueryResultScanner getScanner(String sql, QueryId queryId, String sessionId) throws Exception {
-    QueryContext queryContext = LocalTajoTestingUtility.createDummyContext(conf);
-    
-    Expr expr = analyzer.parse(sql);
-    LogicalPlan logicalPlan = logicalPlanner.createPlan(queryContext, expr);
-    logicalOptimizer.optimize(logicalPlan);
-    
-    int maxRow = Integer.MAX_VALUE;
-    if (logicalPlan.getRootBlock().hasNode(NodeType.LIMIT)) {
-      LimitNode limitNode = logicalPlan.getRootBlock().getNode(NodeType.LIMIT);
-      maxRow = (int) limitNode.getFetchFirstNum();
-    }
-    
-    NonForwardQueryResultScanner queryResultScanner = 
-        new NonForwardQueryResultSystemScanner(masterContext, logicalPlan, queryId,
-            sessionId, maxRow);
-    
-    return queryResultScanner;
-  }
-  
-  @Test
-  public void testInit() throws Exception {
-    QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
-    String sessionId = UUID.randomUUID().toString();
-    NonForwardQueryResultScanner queryResultScanner = 
-        getScanner("SELECT SPACE_ID, SPACE_URI FROM INFORMATION_SCHEMA.TABLESPACE",
-            queryId, sessionId);
-    
-    queryResultScanner.init();
-    
-    assertThat(queryResultScanner.getQueryId(), is(notNullValue()));
-    assertThat(queryResultScanner.getLogicalSchema(), is(notNullValue()));
-    assertThat(queryResultScanner.getSessionId(), is(notNullValue()));
-    assertThat(queryResultScanner.getTableDesc(), is(notNullValue()));
-    
-    assertThat(queryResultScanner.getQueryId(), is(queryId));
-    assertThat(queryResultScanner.getSessionId(), is(sessionId));
-    
-    assertThat(queryResultScanner.getLogicalSchema().size(), is(2));
-    assertThat(queryResultScanner.getLogicalSchema().getColumn("space_id"), is(notNullValue()));
-  }
-  
-  private List<Tuple> getTupleList(RowStoreDecoder decoder, List<ByteString> bytes) {
-    List<Tuple> tuples = new ArrayList<Tuple>(bytes.size());
-    
-    for (ByteString byteString: bytes) {
-      Tuple aTuple = decoder.toTuple(byteString.toByteArray());
-      tuples.add(aTuple);
-    }
-    
-    return tuples;
-  }
-  
-  private <T> Matcher<Tuple> getTupleMatcher(final int fieldId, final Matcher<T> matcher) {
-    return new TypeSafeDiagnosingMatcher<Tuple>() {
-
-      @Override
-      public void describeTo(Description description) {
-        description.appendDescriptionOf(matcher);
-      }
-
-      @Override
-      protected boolean matchesSafely(Tuple item, Description mismatchDescription) {
-        Object itemValue = null;
-
-        Type type = item.type(fieldId);
-        if (type == Type.TEXT) {
-          itemValue = item.getText(fieldId);
-        } else if (type == Type.INT4) {
-          itemValue = item.getInt4(fieldId);
-        } else if (type == Type.INT8) {
-          itemValue = item.getInt8(fieldId);
-        }
-        
-        if (itemValue != null && matcher.matches(itemValue)) {
-          return true;
-        }
-        
-        matcher.describeMismatch(itemValue, mismatchDescription);
-        return false;
-      }
-    };
-  }
-  
+public class TestNonForwardQueryResultSystemScanner extends QueryTestCaseBase {
   @Test
   public void testGetNextRowsForAggregateFunction() throws Exception {
-    NonForwardQueryResultScanner queryResultScanner = 
-        getScanner("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES");
-    
-    queryResultScanner.init();
-    
-    List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
-    
-    assertThat(rowBytes.size(), is(1));
-    
-    RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
-    List<Tuple> tuples = getTupleList(decoder, rowBytes);
-    
-    assertThat(tuples.size(), is(1));
-    assertThat(tuples, hasItem(getTupleMatcher(0, is(9L))));
+    assertQueryStr("SELECT COUNT(*) FROM INFORMATION_SCHEMA.TABLES " +
+        "WHERE TABLE_NAME = 'lineitem' OR TABLE_NAME = 'nation' OR TABLE_NAME = 'customer'");
   }
-  
+
   @Test
   public void testGetNextRowsForTable() throws Exception {
-    NonForwardQueryResultScanner queryResultScanner =
-        getScanner("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES");
-    
-    queryResultScanner.init();
-    
-    List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
-    
-    assertThat(rowBytes.size(), is(9));
-    
-    RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
-    List<Tuple> tuples = getTupleList(decoder, rowBytes);;
-    
-    assertThat(tuples.size(), is(9));
-    assertThat(tuples, hasItem(getTupleMatcher(0, is("lineitem"))));
+    assertQueryStr("SELECT TABLE_NAME, TABLE_TYPE FROM INFORMATION_SCHEMA.TABLES " +
+        "WHERE TABLE_NAME = 'lineitem' OR TABLE_NAME = 'nation' OR TABLE_NAME = 'customer'");
   }
-  
+
   @Test
   public void testGetClusterDetails() throws Exception {
-    NonForwardQueryResultScanner queryResultScanner =
-        getScanner("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER");
-    
-    queryResultScanner.init();
-    
-    List<ByteString> rowBytes = queryResultScanner.getNextRows(100);
-    
-    assertThat(rowBytes.size(), is(2));
-    
-    RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema());
-    List<Tuple> tuples = getTupleList(decoder, rowBytes);
-    
-    assertThat(tuples.size(), is(2));
-    assertThat(tuples, hasItem(getTupleMatcher(0, is("QueryMaster"))));
+    assertQueryStr("SELECT TYPE FROM INFORMATION_SCHEMA.CLUSTER");
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index edddc5a..1351716 100644
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@ -45,7 +45,7 @@ import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.session.Session;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.history.HistoryReader;
 import org.apache.tajo.util.history.HistoryWriter;
@@ -105,7 +105,7 @@ public class TestKillQuery {
     Session session = LocalTajoTestingUtility.createDummySession();
     CatalogService catalog = cluster.getMaster().getCatalog();
 
-    LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     LogicalOptimizer optimizer = new LogicalOptimizer(conf);
     Expr expr =  analyzer.parse(queryStr);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
@@ -169,7 +169,7 @@ public class TestKillQuery {
     Session session = LocalTajoTestingUtility.createDummySession();
     CatalogService catalog = cluster.getMaster().getCatalog();
 
-    LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
+    LogicalPlanner planner = new LogicalPlanner(catalog, TablespaceManager.getInstance());
     LogicalOptimizer optimizer = new LogicalOptimizer(conf);
     Expr expr =  analyzer.parse(queryStr);
     LogicalPlan plan = planner.createPlan(defaultContext, expr);

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 863c7b5..f48a71e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -67,7 +67,7 @@ public class TestRowFile {
 
     TableMeta meta = CatalogUtil.newTableMeta("ROWFILE");
 
-    FileTablespace sm = (FileTablespace) TableSpaceManager.get(cluster.getDefaultFileSystem().getUri()).get();
+    FileTablespace sm = (FileTablespace) TablespaceManager.get(cluster.getDefaultFileSystem().getUri()).get();
 
     Path tablePath = new Path("/test");
     Path metaPath = new Path(tablePath, ".meta");

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result
new file mode 100644
index 0000000..45d730a
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestHBaseTable/testInsertValues1.result
@@ -0,0 +1,4 @@
+aaa, a12, {"": "a34"}, 1
+bbb, b12, {"": "b34"}, 2
+ccc, c12, {"": "c34"}, 3
+ddd, d12, {"": "d34"}, 4

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result
new file mode 100644
index 0000000..9f12294
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetClusterDetails.result
@@ -0,0 +1,4 @@
+type
+-------------------------------
+QueryMaster
+Worker
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result
new file mode 100644
index 0000000..07dd98b
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForAggregateFunction.result
@@ -0,0 +1,3 @@
+?count_2
+-------------------------------
+3
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result
new file mode 100644
index 0000000..fd37504
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestNonForwardQueryResultSystemScanner/testGetNextRowsForTable.result
@@ -0,0 +1,5 @@
+table_name,table_type
+-------------------------------
+customer,EXTERNAL
+lineitem,EXTERNAL
+nation,EXTERNAL
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
index 17f79da..22f4781 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlan.java
@@ -239,6 +239,14 @@ public class LogicalPlan {
     return queryBlocks.get(ROOT_BLOCK);
   }
 
+  public LogicalRootNode getRootNode() {
+    return queryBlocks.get(ROOT_BLOCK).getRoot();
+  }
+
+  public Schema getOutputSchema() {
+    return getRootNode().getOutSchema();
+  }
+
   public QueryBlock getBlock(String blockName) {
     return queryBlocks.get(blockName);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index 16ca368..441e047 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -910,6 +910,22 @@ public class PlannerUtil {
     }
   }
 
+  public static TableDesc getOutputTableDesc(LogicalPlan plan) {
+    LogicalNode [] found = findAllNodes(plan.getRootNode().getChild(), NodeType.CREATE_TABLE, NodeType.INSERT);
+
+    if (found.length == 0) {
+      return new TableDesc(null, plan.getRootNode().getOutSchema(), "TEXT", new KeyValueSet(), null);
+    } else {
+      StoreTableNode storeNode = (StoreTableNode) found[0];
+      return new TableDesc(
+          storeNode.getTableName(),
+          storeNode.getOutSchema(),
+          storeNode.getStorageType(),
+          storeNode.getOptions(),
+          storeNode.getUri());
+    }
+  }
+
   public static TableDesc getTableDesc(CatalogService catalog, LogicalNode node) throws IOException {
     if (node.getType() == NodeType.ROOT) {
       node = ((LogicalRootNode)node).getChild();

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
index 0f0cd10..547a6f2 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
@@ -18,14 +18,46 @@
 
 package org.apache.tajo.storage;
 
+/**
+ * Format properties
+ */
 public class FormatProperty {
-  private boolean sortedInsertRequired;
 
-  public FormatProperty(boolean sortedInsertRequired) {
-    this.sortedInsertRequired = sortedInsertRequired;
+  /** if this format supports insert operation */
+  private boolean insertable;
+  /** if this format supports direct insertion (e.g., HBASE or JDBC-based storages) */
+  private boolean directInsert;
+  /** if this format supports staging phase */
+  private boolean stagingSupport;
+
+  public FormatProperty(boolean insertable, boolean directInsert, boolean stagingSupport) {
+    this.insertable = insertable;
+    this.stagingSupport = stagingSupport;
+    this.directInsert = directInsert;
+  }
+
+  /**
+   * Return if this format supports staging phase
+   * @return True if this format supports staging phase
+   */
+  public boolean isInsertable() {
+    return insertable;
+  }
+
+  /**
+   * Return if this format supports direct insertion (e.g., HBASE or JDBC-based storages)
+   * @return True if this format supports direct insertion
+   */
+  public boolean directInsertSupported() {
+    return directInsert;
   }
 
-  public boolean sortedInsertRequired() {
-    return sortedInsertRequired;
+  /**
+   * Return if this format supports staging phase
+   *
+   * @return True if this format supports staging phase
+   */
+  public boolean isStagingSupport() {
+    return stagingSupport;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
index ce573be..67a2f86 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -64,7 +64,7 @@ public class MergeScanner implements Scanner {
 
     long numBytes = 0;
     for (Fragment eachFileFragment: rawFragmentList) {
-      long fragmentLength = TableSpaceManager.guessFragmentVolume((TajoConf) conf, eachFileFragment);
+      long fragmentLength = TablespaceManager.guessFragmentVolume((TajoConf) conf, eachFileFragment);
       if (fragmentLength > 0) {
         numBytes += fragmentLength;
         fragments.add(eachFileFragment);
@@ -131,7 +131,7 @@ public class MergeScanner implements Scanner {
   private Scanner getNextScanner() throws IOException {
     if (iterator.hasNext()) {
       currentFragment = iterator.next();
-      currentScanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, currentFragment, target);
+      currentScanner = TablespaceManager.getLocalFs().getScanner(meta, schema, currentFragment, target);
       currentScanner.init();
       return currentScanner;
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
index 12b236f..ef33a8e 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
@@ -31,7 +31,6 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.FileUtil;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -158,7 +157,7 @@ public class OldStorageManager {
           Constructor<? extends Tablespace> constructor =
               (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass);
           if (constructor == null) {
-            constructor = storageManagerClass.getDeclaredConstructor(TableSpaceManager.TABLESPACE_PARAM);
+            constructor = storageManagerClass.getDeclaredConstructor(TablespaceManager.TABLESPACE_PARAM);
             constructor.setAccessible(true);
             CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
           }

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
index 38d0734..c1db34e 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
@@ -18,20 +18,39 @@
 
 package org.apache.tajo.storage;
 
+/**
+ * Storage Properties
+ */
 public class StorageProperty {
-  private boolean movable;
-  private boolean writable;
-  private boolean insertable;
-  private boolean absolutePathAllowed;
+  /** default file format */
+  private final String defaultFormat;
+  /** if this storage supports move operator */
+  private final boolean movable;
+  /** if this storage supports is writable */
+  private final boolean writable;
+  /** if this storage allows use of artibrary paths */
+  private final boolean absolutePathAllowed;
+
+  public StorageProperty(String defaultFormat,
+                         boolean movable,
+                         boolean writable,
+                         boolean absolutePathAllowed) {
 
-  public StorageProperty(boolean movable, boolean writable, boolean isInsertable, boolean absolutePathAllowed) {
+    this.defaultFormat = defaultFormat;
     this.movable = movable;
     this.writable = writable;
-    this.insertable = isInsertable;
     this.absolutePathAllowed = absolutePathAllowed;
   }
 
   /**
+   * Return default file format
+   * @return Default file format
+   */
+  public String defaultFormat() {
+    return defaultFormat;
+  }
+
+  /**
    * Move-like operation is allowed
    *
    * @return true if move operation is available
@@ -50,18 +69,9 @@ public class StorageProperty {
   }
 
   /**
-   * this storage supports insert operation?
-   *
-   * @return true if insert operation is allowed.
-   */
-  public boolean isInsertable() {
-    return insertable;
-  }
-
-  /**
    * Does this storage allows the use of arbitrary absolute paths outside tablespace?
    *
-   * @return
+   * @return True if this storage allows accesses to artibrary paths.
    */
   public boolean isArbitraryPathAllowed() {
     return this.absolutePathAllowed;

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
deleted file mode 100644
index ef04509..0000000
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
+++ /dev/null
@@ -1,390 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import net.minidev.json.JSONObject;
-import net.minidev.json.parser.JSONParser;
-import net.minidev.json.parser.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.Pair;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.URI;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import static org.apache.tajo.storage.StorageConstants.LOCAL_FS_URI;
-
-/**
- * It handles available table spaces and cache TableSpace instances.
- *
- * Default tablespace must be a filesystem-based one.
- * HDFS and S3 can be a default tablespace if a Tajo cluster is in fully distributed mode.
- * Local file system can be a default tablespace if a Tajo cluster runs on a single machine.
- */
-public class TableSpaceManager implements StorageService {
-  private static final Log LOG = LogFactory.getLog(TableSpaceManager.class);
-
-  public static final String DEFAULT_CONFIG_FILE = "storage-default.json";
-  public static final String SITE_CONFIG_FILE = "storage-site.json";
-
-  /** default tablespace name */
-  public static final String DEFAULT_TABLESPACE_NAME = "default";
-
-  private final static TajoConf systemConf = new TajoConf();
-  private final static JSONParser parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR);
-
-  // The relation ship among name, URI, Tablespaces must be kept 1:1:1.
-  protected static final Map<String, URI> SPACES_URIS_MAP = Maps.newHashMap();
-  protected static final TreeMap<URI, Tablespace> TABLE_SPACES = Maps.newTreeMap();
-
-  protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap();
-  protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap();
-
-  public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class};
-
-  static {
-    instance = new TableSpaceManager();
-  }
-  /**
-   * Singleton instance
-   */
-  private static final TableSpaceManager instance;
-
-  private TableSpaceManager() {
-    initForDefaultConfig(); // loading storage-default.json
-    initSiteConfig();       // storage-site.json will override the configs of storage-default.json
-    addWarehouseAsSpace();  // adding a warehouse directory for a default tablespace
-    addLocalFsTablespace(); // adding a tablespace using local file system by default
-  }
-
-  private void addWarehouseAsSpace() {
-    Path warehouseDir = TajoConf.getWarehouseDir(systemConf);
-    registerTableSpace(DEFAULT_TABLESPACE_NAME, warehouseDir.toUri(), null, true, false);
-  }
-
-  private void addLocalFsTablespace() {
-    if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) {
-      String tmpName = UUID.randomUUID().toString();
-      registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false);
-    }
-  }
-
-  public static TableSpaceManager getInstance() {
-    return instance;
-  }
-
-  private void initForDefaultConfig() {
-    JSONObject json = loadFromConfig(DEFAULT_CONFIG_FILE);
-    if (json == null) {
-      throw new IllegalStateException("There is no " + SITE_CONFIG_FILE);
-    }
-    applyConfig(json, false);
-  }
-
-  private void initSiteConfig() {
-    JSONObject json = loadFromConfig(SITE_CONFIG_FILE);
-
-    // if there is no storage-site.json file, nothing happen.
-    if (json != null) {
-      applyConfig(json, true);
-    }
-  }
-
-  private JSONObject loadFromConfig(String fileName) {
-    String json;
-    try {
-      json = FileUtil.readTextFileFromResource(fileName);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    if (json != null) {
-      return parseJson(json);
-    } else {
-      return null;
-    }
-  }
-
-  private static JSONObject parseJson(String json) {
-    try {
-      return (JSONObject) parser.parse(json);
-    } catch (ParseException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void applyConfig(JSONObject json, boolean override) {
-    loadStorages(json);
-    loadTableSpaces(json, override);
-  }
-
-  private void loadStorages(JSONObject json) {
-    JSONObject spaces = (JSONObject) json.get(KEY_STORAGES);
-
-    if (spaces != null) {
-      Pair<String, Class<? extends Tablespace>> pair = null;
-      for (Map.Entry<String, Object> entry : spaces.entrySet()) {
-
-        try {
-          pair = extractStorage(entry);
-        } catch (ClassNotFoundException e) {
-          LOG.warn(e);
-          continue;
-        }
-
-        TABLE_SPACE_HANDLERS.put(pair.getFirst(), pair.getSecond());
-      }
-    }
-  }
-
-  private Pair<String, Class<? extends Tablespace>> extractStorage(Map.Entry<String, Object> entry)
-      throws ClassNotFoundException {
-
-    String storageType = entry.getKey();
-    JSONObject storageDesc = (JSONObject) entry.getValue();
-    String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER);
-
-    return new Pair<String, Class<? extends Tablespace>>(
-        storageType,(Class<? extends Tablespace>) Class.forName(handlerClass));
-  }
-
-  private void loadTableSpaces(JSONObject json, boolean override) {
-    JSONObject spaces = (JSONObject) json.get(KEY_SPACES);
-
-    if (spaces != null) {
-      for (Map.Entry<String, Object> entry : spaces.entrySet()) {
-        AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override);
-      }
-    }
-  }
-
-  public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) {
-    boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default"));
-    URI spaceUri = URI.create(spaceDesc.getAsString("uri"));
-
-    if (defaultSpace) {
-      registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override);
-    }
-    registerTableSpace(spaceName, spaceUri, spaceDesc, true, override);
-  }
-
-  private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc,
-                                         boolean visible, boolean override) {
-    Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible);
-    tableSpace.setVisible(visible);
-
-    try {
-      tableSpace.init(systemConf);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    putTablespace(tableSpace, override);
-
-    // If the arbitrary path is allowed, root uri is also added as a tablespace
-    if (tableSpace.getProperty().isArbitraryPathAllowed()) {
-      URI rootUri = tableSpace.getRootUri();
-      // if there already exists or the rootUri is 'file:/', it won't overwrite the tablespace.
-      if (!TABLE_SPACES.containsKey(rootUri) && !rootUri.toString().startsWith(LOCAL_FS_URI.toString())) {
-        String tmpName = UUID.randomUUID().toString();
-        registerTableSpace(tmpName, rootUri, spaceDesc, false, override);
-      }
-    }
-  }
-
-  private static void putTablespace(Tablespace space, boolean override) {
-    // It is a device to keep the relationship among name, URI, and tablespace 1:1:1.
-
-    boolean nameExist = SPACES_URIS_MAP.containsKey(space.getName());
-    boolean uriExist = TABLE_SPACES.containsKey(space.uri);
-
-    boolean mismatch = nameExist && !SPACES_URIS_MAP.get(space.getName()).equals(space.getUri());
-    mismatch = mismatch || uriExist && TABLE_SPACES.get(space.uri).equals(space);
-
-    if (!override && mismatch) {
-      throw new RuntimeException("Name or URI of Tablespace must be unique.");
-    }
-
-    SPACES_URIS_MAP.put(space.getName(), space.getUri());
-    // We must guarantee that the same uri results in the same tablespace instance.
-    TABLE_SPACES.put(space.getUri(), space);
-  }
-
-  /**
-   * Return length of the fragment.
-   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
-   *
-   * @param conf Tajo system property
-   * @param fragment Fragment
-   * @return
-   */
-  public static long guessFragmentVolume(TajoConf conf, Fragment fragment) {
-    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
-      return conf.getLongVar(TajoConf.ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
-    } else {
-      return fragment.getLength();
-    }
-  }
-
-  public static final String KEY_STORAGES = "storages"; // storages
-  public static final String KEY_STORAGE_HANDLER = "handler"; // storages/?/handler
-  public static final String KEY_STORAGE_DEFAULT_FORMAT = "default-format"; // storages/?/default-format
-
-  public static final String KEY_SPACES = "spaces";
-
-  private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) {
-    Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri);
-    Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme());
-
-    if (clazz == null) {
-      throw new RuntimeException("There is no tablespace for " + uri.toString());
-    }
-
-    try {
-      Constructor<? extends Tablespace> constructor =
-          (Constructor<? extends Tablespace>) CONSTRUCTORS.get(clazz);
-
-      if (constructor == null) {
-        constructor = clazz.getDeclaredConstructor(TABLESPACE_PARAM);
-        constructor.setAccessible(true);
-        CONSTRUCTORS.put(clazz, constructor);
-      }
-
-      return constructor.newInstance(new Object[]{spaceName, uri});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @VisibleForTesting
-  public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) {
-    Tablespace existing;
-    synchronized (SPACES_URIS_MAP) {
-      // Remove existing one
-      SPACES_URIS_MAP.remove(space.getName());
-      existing = TABLE_SPACES.remove(space.getUri());
-
-      // Add anotherone for test
-      registerTableSpace(space.name, space.uri, null, true, true);
-    }
-    // if there is an existing one, return it.
-    return Optional.fromNullable(existing);
-  }
-
-  public Iterable<String> getSupportSchemes() {
-    return TABLE_SPACE_HANDLERS.keySet();
-  }
-
-  /**
-   * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
-   *
-   * @param uri Table or Table Fragment URI.
-   * @param <T> Tablespace class type
-   * @return Tablespace. If uri is null, the default tablespace will be returned.
-   */
-  public static <T extends Tablespace> Optional<T> get(@Nullable String uri) {
-
-    if (uri == null || uri.isEmpty()) {
-      return (Optional<T>) Optional.of(getDefault());
-    }
-
-    Tablespace lastOne = null;
-
-    // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and
-    // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific.
-    for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) {
-      if (uri.startsWith(entry.getKey().toString())) {
-        lastOne = entry.getValue();
-      }
-    }
-    return (Optional<T>) Optional.fromNullable(lastOne);
-  }
-
-  /**
-   * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
-   *
-   * @param uri Table or Table Fragment URI.
-   * @param <T> Tablespace class type
-   * @return Tablespace. If uri is null, the default tablespace will be returned.
-   */
-  public static <T extends Tablespace> Optional<T> get(@Nullable URI uri) {
-    if (uri == null) {
-      return (Optional<T>) Optional.of(getDefault());
-    } else {
-      return (Optional<T>) get(uri.toString());
-    }
-  }
-
-  /**
-   * It returns the default tablespace. This method ensures that it always return the tablespace.
-   *
-   * @return
-   */
-  public static <T extends Tablespace> T getDefault() {
-    return (T) getByName(DEFAULT_TABLESPACE_NAME).get();
-  }
-
-  public static <T extends Tablespace> T getLocalFs() {
-    return (T) get(LOCAL_FS_URI).get();
-  }
-
-  public static Optional<? extends Tablespace> getByName(String name) {
-    URI uri = SPACES_URIS_MAP.get(name);
-    if (uri != null) {
-      return Optional.of(TABLE_SPACES.get(uri));
-    } else {
-      return Optional.absent();
-    }
-  }
-
-  public static Optional<? extends Tablespace> getAnyByScheme(String scheme) {
-    for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.entrySet()) {
-      String uriScheme = entry.getKey().getScheme();
-      if (uriScheme != null && uriScheme.equalsIgnoreCase(scheme)) {
-        return Optional.of(entry.getValue());
-      }
-    }
-
-    return Optional.absent();
-  }
-
-  @Override
-  public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) {
-    Tablespace space = spaceName == null ? getDefault() : getByName(spaceName).get();
-    return space.getTableUri(databaseName, tableName);
-  }
-
-  public static Iterable<Tablespace> getAllTablespaces() {
-    return TABLE_SPACES.values();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/90afaa46/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 77c5d05..52e223d 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -18,9 +18,11 @@
 
 package org.apache.tajo.storage;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.OverridableConf;
+import org.apache.tajo.QueryVars;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
@@ -149,7 +151,7 @@ public abstract class Tablespace {
    */
   public abstract StorageProperty getProperty();
 
-  public abstract FormatProperty getFormatProperty(String dataFormat);
+  public abstract FormatProperty getFormatProperty(TableMeta meta);
 
   /**
    * Release storage manager resource
@@ -259,6 +261,14 @@ public abstract class Tablespace {
     return scanner;
   }
 
+  public Appender getAppenderForInsertRow(OverridableConf queryContext,
+                                          TaskAttemptId taskAttemptId,
+                                          TableMeta meta,
+                                          Schema schema,
+                                          Path workDir) throws IOException {
+    return getAppender(queryContext, taskAttemptId, meta, schema, workDir);
+  }
+
   /**
    * Returns Appender instance.
    * @param queryContext Query property.
@@ -395,4 +405,11 @@ public abstract class Tablespace {
       return false;
     }
   }
+
+  public abstract URI getStagingUri(OverridableConf context, String queryId, TableMeta meta) throws IOException;
+
+  public URI prepareStagingSpace(TajoConf conf, String queryId, OverridableConf context,
+                                 TableMeta meta) throws IOException {
+    throw new IOException("Staging the output result is not supported in this storage");
+  }
 }