You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/06/24 00:49:16 UTC

[3/7] tajo git commit: TAJO-1616: Implement TablespaceManager to load Tablespaces.

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/pom.xml b/tajo-storage/tajo-storage-common/pom.xml
index f7c9676..f1d3438 100644
--- a/tajo-storage/tajo-storage-common/pom.xml
+++ b/tajo-storage/tajo-storage-common/pom.xml
@@ -58,6 +58,12 @@ limitations under the License.
       <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/main/resources/*.json</exclude>
+            <exclude>src/test/resources/*.json</exclude>
+          </excludes>
+        </configuration>
         <executions>
           <execution>
             <phase>verify</phase>
@@ -293,6 +299,10 @@ limitations under the License.
       <groupId>io.netty</groupId>
       <artifactId>netty-buffer</artifactId>
     </dependency>
+    <dependency>
+      <groupId>net.minidev</groupId>
+      <artifactId>json-smart</artifactId>
+    </dependency>
   </dependencies>
 
   <profiles>
@@ -334,4 +344,4 @@ limitations under the License.
       </plugin>
     </plugins>
   </reporting>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
new file mode 100644
index 0000000..0f0cd10
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/FormatProperty.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+public class FormatProperty {
+  private boolean sortedInsertRequired;
+
+  public FormatProperty(boolean sortedInsertRequired) {
+    this.sortedInsertRequired = sortedInsertRequired;
+  }
+
+  public boolean sortedInsertRequired() {
+    return sortedInsertRequired;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
index a8926a0..ce573be 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -64,7 +64,7 @@ public class MergeScanner implements Scanner {
 
     long numBytes = 0;
     for (Fragment eachFileFragment: rawFragmentList) {
-      long fragmentLength = Tablespace.getFragmentLength((TajoConf) conf, eachFileFragment);
+      long fragmentLength = TableSpaceManager.guessFragmentVolume((TajoConf) conf, eachFileFragment);
       if (fragmentLength > 0) {
         numBytes += fragmentLength;
         fragments.add(eachFileFragment);
@@ -131,8 +131,7 @@ public class MergeScanner implements Scanner {
   private Scanner getNextScanner() throws IOException {
     if (iterator.hasNext()) {
       currentFragment = iterator.next();
-      currentScanner = TableSpaceManager.getStorageManager((TajoConf) conf, meta.getStoreType()).getScanner(meta, schema,
-          currentFragment, target);
+      currentScanner = TableSpaceManager.getLocalFs().getScanner(meta, schema, currentFragment, target);
       currentScanner.init();
       return currentScanner;
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
new file mode 100644
index 0000000..12b236f
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/OldStorageManager.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.FileUtil;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * It handles available table spaces and cache TableSpace instances.
+ */
+public class OldStorageManager {
+  private static final Log LOG = LogFactory.getLog(OldStorageManager.class);
+
+  /**
+   * Cache of scanner handlers for each storage type.
+   */
+  protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends Scanner>>();
+  /**
+   * Cache of appender handlers for each storage type.
+   */
+  protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
+      = new ConcurrentHashMap<String, Class<? extends Appender>>();
+  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
+      Configuration.class,
+      Schema.class,
+      TableMeta.class,
+      Fragment.class
+  };
+  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
+      Configuration.class,
+      TaskAttemptId.class,
+      Schema.class,
+      TableMeta.class,
+      Path.class
+  };
+  /**
+   * Cache of Tablespace.
+   * Key is manager key(warehouse path) + store type
+   */
+  private static final Map<String, Tablespace> storageManagers = Maps.newHashMap();
+  /**
+   * Cache of constructors for each class. Pins the classes so they
+   * can't be garbage collected until ReflectionUtils can be collected.
+   */
+  protected static Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
+
+  /**
+   * Clear all class cache
+   */
+  @VisibleForTesting
+  protected synchronized static void clearCache() {
+    CONSTRUCTOR_CACHE.clear();
+    SCANNER_HANDLER_CACHE.clear();
+    APPENDER_HANDLER_CACHE.clear();
+    storageManagers.clear();
+  }
+
+  /**
+   * Close Tablespace
+   * @throws java.io.IOException
+   */
+  public static void shutdown() throws IOException {
+    synchronized(storageManagers) {
+      for (Tablespace eachTablespace : storageManagers.values()) {
+        eachTablespace.close();
+      }
+    }
+    clearCache();
+  }
+
+  /**
+   * Returns the proper Tablespace instance according to the storeType.
+   *
+   * @param tajoConf Tajo system property.
+   * @param storeType Storage type
+   * @return
+   * @throws IOException
+   */
+  public static Tablespace getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
+    FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
+    if (fileSystem != null) {
+      return getStorageManager(tajoConf, fileSystem.getUri(), storeType);
+    } else {
+      return getStorageManager(tajoConf, null, storeType);
+    }
+  }
+
+  /**
+   * Returns the proper Tablespace instance according to the storeType
+   *
+   * @param tajoConf Tajo system property.
+   * @param uri Key that can identify each storage manager(may be a path)
+   * @param storeType Storage type
+   * @return
+   * @throws IOException
+   */
+  public static synchronized Tablespace getStorageManager(
+      TajoConf tajoConf, URI uri, String storeType) throws IOException {
+    Preconditions.checkNotNull(tajoConf);
+    Preconditions.checkNotNull(uri);
+    Preconditions.checkNotNull(storeType);
+
+    String typeName;
+    if (storeType.equalsIgnoreCase("HBASE")) {
+      typeName = "hbase";
+    } else {
+      typeName = "hdfs";
+    }
+
+    synchronized (storageManagers) {
+      String storeKey = typeName + "_" + uri.toString();
+      Tablespace manager = storageManagers.get(storeKey);
+
+      if (manager == null) {
+        Class<? extends Tablespace> storageManagerClass =
+            tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, Tablespace.class);
+
+        if (storageManagerClass == null) {
+          throw new IOException("Unknown Storage Type: " + typeName);
+        }
+
+        try {
+          Constructor<? extends Tablespace> constructor =
+              (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass);
+          if (constructor == null) {
+            constructor = storageManagerClass.getDeclaredConstructor(TableSpaceManager.TABLESPACE_PARAM);
+            constructor.setAccessible(true);
+            CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
+          }
+          manager = constructor.newInstance(new Object[]{"noname", uri});
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        manager.init(tajoConf);
+        storageManagers.put(storeKey, manager);
+      }
+
+      return manager;
+    }
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param conf The system property
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param fragment The fragment for scanning
+   * @param target The output schema
+   * @return Scanner instance
+   * @throws IOException
+   */
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
+    return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
+  }
+
+  /**
+   * Creates a scanner instance.
+   *
+   * @param theClass Concrete class of scanner
+   * @param conf System property
+   * @param schema Input schema
+   * @param meta Table meta data
+   * @param fragment The fragment for scanning
+   * @param <T>
+   * @return The scanner instance
+   */
+  public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
+                                         Fragment fragment) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+
+  /**
+   * Creates a scanner instance.
+   *
+   * @param theClass Concrete class of scanner
+   * @param conf System property
+   * @param taskAttemptId Task id
+   * @param meta Table meta data
+   * @param schema Input schema
+   * @param workDir Working directory
+   * @param <T>
+   * @return The scanner instance
+   */
+  public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
+                                          TableMeta meta, Schema schema, Path workDir) {
+    T result;
+    try {
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
index 6816d08..38d0734 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageProperty.java
@@ -19,22 +19,51 @@
 package org.apache.tajo.storage;
 
 public class StorageProperty {
-  private boolean supportsInsertInto;
-  private boolean sortedInsert;
+  private boolean movable;
+  private boolean writable;
+  private boolean insertable;
+  private boolean absolutePathAllowed;
 
-  public boolean isSupportsInsertInto() {
-    return supportsInsertInto;
+  public StorageProperty(boolean movable, boolean writable, boolean isInsertable, boolean absolutePathAllowed) {
+    this.movable = movable;
+    this.writable = writable;
+    this.insertable = isInsertable;
+    this.absolutePathAllowed = absolutePathAllowed;
   }
 
-  public void setSupportsInsertInto(boolean supportsInsertInto) {
-    this.supportsInsertInto = supportsInsertInto;
+  /**
+   * Move-like operation is allowed
+   *
+   * @return true if move operation is available
+   */
+  public boolean isMovable() {
+    return movable;
   }
 
-  public boolean isSortedInsert() {
-    return sortedInsert;
+  /**
+   * Is it Writable storage?
+   *
+   * @return true if this storage is writable.
+   */
+  public boolean isWritable() {
+    return writable;
   }
 
-  public void setSortedInsert(boolean sortedInsert) {
-    this.sortedInsert = sortedInsert;
+  /**
+   * this storage supports insert operation?
+   *
+   * @return true if insert operation is allowed.
+   */
+  public boolean isInsertable() {
+    return insertable;
+  }
+
+  /**
+   * Does this storage allows the use of arbitrary absolute paths outside tablespace?
+   *
+   * @return
+   */
+  public boolean isArbitraryPathAllowed() {
+    return this.absolutePathAllowed;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 926b5d3..20a5d5c 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -68,7 +68,7 @@ public class StorageUtil extends StorageConstants {
         return 0;
     }
   }
-  
+
   public static Path concatPath(String parent, String...childs) {
     return concatPath(new Path(parent), childs);
   }
@@ -82,7 +82,7 @@ public class StorageUtil extends StorageConstants {
         sb.append("/");
     }
     
-    return new Path(parent, sb.toString());
+    return new Path(parent + "/" + sb.toString());
   }
 
   static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*";

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
index a787cdb..ef04509 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableSpaceManager.java
@@ -19,236 +19,372 @@
 package org.apache.tajo.storage;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import net.minidev.json.JSONObject;
+import net.minidev.json.parser.JSONParser;
+import net.minidev.json.parser.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.Pair;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
+import java.net.URI;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static org.apache.tajo.storage.StorageConstants.LOCAL_FS_URI;
 
 /**
  * It handles available table spaces and cache TableSpace instances.
+ *
+ * Default tablespace must be a filesystem-based one.
+ * HDFS and S3 can be a default tablespace if a Tajo cluster is in fully distributed mode.
+ * Local file system can be a default tablespace if a Tajo cluster runs on a single machine.
  */
-public class TableSpaceManager {
+public class TableSpaceManager implements StorageService {
+  private static final Log LOG = LogFactory.getLog(TableSpaceManager.class);
 
-  /**
-   * Cache of scanner handlers for each storage type.
-   */
-  protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends Scanner>>();
-  /**
-   * Cache of appender handlers for each storage type.
-   */
-  protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends Appender>>();
-  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
-      Configuration.class,
-      Schema.class,
-      TableMeta.class,
-      Fragment.class
-  };
-  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
-      Configuration.class,
-      TaskAttemptId.class,
-      Schema.class,
-      TableMeta.class,
-      Path.class
-  };
-  /**
-   * Cache of Tablespace.
-   * Key is manager key(warehouse path) + store type
-   */
-  private static final Map<String, Tablespace> storageManagers = Maps.newHashMap();
-  /**
-   * Cache of constructors for each class. Pins the classes so they
-   * can't be garbage collected until ReflectionUtils can be collected.
-   */
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
-      new ConcurrentHashMap<Class<?>, Constructor<?>>();
+  public static final String DEFAULT_CONFIG_FILE = "storage-default.json";
+  public static final String SITE_CONFIG_FILE = "storage-site.json";
+
+  /** default tablespace name */
+  public static final String DEFAULT_TABLESPACE_NAME = "default";
+
+  private final static TajoConf systemConf = new TajoConf();
+  private final static JSONParser parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE | JSONParser.IGNORE_CONTROL_CHAR);
+
+  // The relation ship among name, URI, Tablespaces must be kept 1:1:1.
+  protected static final Map<String, URI> SPACES_URIS_MAP = Maps.newHashMap();
+  protected static final TreeMap<URI, Tablespace> TABLE_SPACES = Maps.newTreeMap();
 
+  protected static final Map<Class<?>, Constructor<?>> CONSTRUCTORS = Maps.newHashMap();
+  protected static final Map<String, Class<? extends Tablespace>> TABLE_SPACE_HANDLERS = Maps.newHashMap();
+
+  public static final Class [] TABLESPACE_PARAM = new Class [] {String.class, URI.class};
+
+  static {
+    instance = new TableSpaceManager();
+  }
   /**
-   * Clear all class cache
+   * Singleton instance
    */
-  @VisibleForTesting
-  protected synchronized static void clearCache() {
-    CONSTRUCTOR_CACHE.clear();
-    SCANNER_HANDLER_CACHE.clear();
-    APPENDER_HANDLER_CACHE.clear();
-    storageManagers.clear();
+  private static final TableSpaceManager instance;
+
+  private TableSpaceManager() {
+    initForDefaultConfig(); // loading storage-default.json
+    initSiteConfig();       // storage-site.json will override the configs of storage-default.json
+    addWarehouseAsSpace();  // adding a warehouse directory for a default tablespace
+    addLocalFsTablespace(); // adding a tablespace using local file system by default
   }
 
-  /**
-   * Close Tablespace
-   * @throws java.io.IOException
-   */
-  public static void shutdown() throws IOException {
-    synchronized(storageManagers) {
-      for (Tablespace eachTablespace : storageManagers.values()) {
-        eachTablespace.close();
-      }
+  private void addWarehouseAsSpace() {
+    Path warehouseDir = TajoConf.getWarehouseDir(systemConf);
+    registerTableSpace(DEFAULT_TABLESPACE_NAME, warehouseDir.toUri(), null, true, false);
+  }
+
+  private void addLocalFsTablespace() {
+    if (TABLE_SPACES.headMap(LOCAL_FS_URI, true).firstEntry() == null) {
+      String tmpName = UUID.randomUUID().toString();
+      registerTableSpace(tmpName, LOCAL_FS_URI, null, false, false);
     }
-    clearCache();
   }
 
-  /**
-   * Returns FileStorageManager instance.
-   *
-   * @param tajoConf Tajo system property.
-   * @return
-   * @throws IOException
-   */
-  public static Tablespace getFileStorageManager(TajoConf tajoConf) throws IOException {
-    return getStorageManager(tajoConf, "CSV");
+  public static TableSpaceManager getInstance() {
+    return instance;
   }
 
-  /**
-   * Returns the proper Tablespace instance according to the storeType.
-   *
-   * @param tajoConf Tajo system property.
-   * @param storeType Storage type
-   * @return
-   * @throws IOException
-   */
-  public static Tablespace getStorageManager(TajoConf tajoConf, String storeType) throws IOException {
-    FileSystem fileSystem = TajoConf.getWarehouseDir(tajoConf).getFileSystem(tajoConf);
-    if (fileSystem != null) {
-      return getStorageManager(tajoConf, storeType, fileSystem.getUri().toString());
-    } else {
-      return getStorageManager(tajoConf, storeType, null);
+  private void initForDefaultConfig() {
+    JSONObject json = loadFromConfig(DEFAULT_CONFIG_FILE);
+    if (json == null) {
+      throw new IllegalStateException("There is no " + SITE_CONFIG_FILE);
     }
+    applyConfig(json, false);
   }
 
-  /**
-   * Returns the proper Tablespace instance according to the storeType
-   *
-   * @param tajoConf Tajo system property.
-   * @param storeType Storage type
-   * @param managerKey Key that can identify each storage manager(may be a path)
-   * @return
-   * @throws IOException
-   */
-  private static synchronized Tablespace getStorageManager (
-      TajoConf tajoConf, String storeType, String managerKey) throws IOException {
+  private void initSiteConfig() {
+    JSONObject json = loadFromConfig(SITE_CONFIG_FILE);
+
+    // if there is no storage-site.json file, nothing happen.
+    if (json != null) {
+      applyConfig(json, true);
+    }
+  }
+
+  private JSONObject loadFromConfig(String fileName) {
+    String json;
+    try {
+      json = FileUtil.readTextFileFromResource(fileName);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
 
-    String typeName;
-    if (storeType.equalsIgnoreCase("HBASE")) {
-      typeName = "hbase";
+    if (json != null) {
+      return parseJson(json);
     } else {
-      typeName = "hdfs";
+      return null;
     }
+  }
+
+  private static JSONObject parseJson(String json) {
+    try {
+      return (JSONObject) parser.parse(json);
+    } catch (ParseException e) {
+      throw new RuntimeException(e);
+    }
+  }
 
-    synchronized (storageManagers) {
-      String storeKey = typeName + "_" + managerKey;
-      Tablespace manager = storageManagers.get(storeKey);
+  private void applyConfig(JSONObject json, boolean override) {
+    loadStorages(json);
+    loadTableSpaces(json, override);
+  }
 
-      if (manager == null) {
-        Class<? extends Tablespace> storageManagerClass =
-            tajoConf.getClass(String.format("tajo.storage.manager.%s.class", typeName), null, Tablespace.class);
+  private void loadStorages(JSONObject json) {
+    JSONObject spaces = (JSONObject) json.get(KEY_STORAGES);
 
-        if (storageManagerClass == null) {
-          throw new IOException("Unknown Storage Type: " + typeName);
-        }
+    if (spaces != null) {
+      Pair<String, Class<? extends Tablespace>> pair = null;
+      for (Map.Entry<String, Object> entry : spaces.entrySet()) {
 
         try {
-          Constructor<? extends Tablespace> constructor =
-              (Constructor<? extends Tablespace>) CONSTRUCTOR_CACHE.get(storageManagerClass);
-          if (constructor == null) {
-            constructor = storageManagerClass.getDeclaredConstructor(new Class<?>[]{String.class});
-            constructor.setAccessible(true);
-            CONSTRUCTOR_CACHE.put(storageManagerClass, constructor);
-          }
-          manager = constructor.newInstance(new Object[]{storeType});
-        } catch (Exception e) {
-          throw new RuntimeException(e);
+          pair = extractStorage(entry);
+        } catch (ClassNotFoundException e) {
+          LOG.warn(e);
+          continue;
         }
-        manager.init(tajoConf);
-        storageManagers.put(storeKey, manager);
+
+        TABLE_SPACE_HANDLERS.put(pair.getFirst(), pair.getSecond());
+      }
+    }
+  }
+
+  private Pair<String, Class<? extends Tablespace>> extractStorage(Map.Entry<String, Object> entry)
+      throws ClassNotFoundException {
+
+    String storageType = entry.getKey();
+    JSONObject storageDesc = (JSONObject) entry.getValue();
+    String handlerClass = (String) storageDesc.get(KEY_STORAGE_HANDLER);
+
+    return new Pair<String, Class<? extends Tablespace>>(
+        storageType,(Class<? extends Tablespace>) Class.forName(handlerClass));
+  }
+
+  private void loadTableSpaces(JSONObject json, boolean override) {
+    JSONObject spaces = (JSONObject) json.get(KEY_SPACES);
+
+    if (spaces != null) {
+      for (Map.Entry<String, Object> entry : spaces.entrySet()) {
+        AddTableSpace(entry.getKey(), (JSONObject) entry.getValue(), override);
       }
+    }
+  }
 
-      return manager;
+  public static void AddTableSpace(String spaceName, JSONObject spaceDesc, boolean override) {
+    boolean defaultSpace = Boolean.parseBoolean(spaceDesc.getAsString("default"));
+    URI spaceUri = URI.create(spaceDesc.getAsString("uri"));
+
+    if (defaultSpace) {
+      registerTableSpace(DEFAULT_TABLESPACE_NAME, spaceUri, spaceDesc, true, override);
     }
+    registerTableSpace(spaceName, spaceUri, spaceDesc, true, override);
   }
 
-  /**
-   * Returns Scanner instance.
-   *
-   * @param conf The system property
-   * @param meta The table meta
-   * @param schema The input schema
-   * @param fragment The fragment for scanning
-   * @param target The output schema
-   * @return Scanner instance
-   * @throws IOException
-   */
-  public static synchronized SeekableScanner getSeekableScanner(
-      TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
-    return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
+  private static void registerTableSpace(String spaceName, URI uri, JSONObject spaceDesc,
+                                         boolean visible, boolean override) {
+    Tablespace tableSpace = initializeTableSpace(spaceName, uri, visible);
+    tableSpace.setVisible(visible);
+
+    try {
+      tableSpace.init(systemConf);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    putTablespace(tableSpace, override);
+
+    // If the arbitrary path is allowed, root uri is also added as a tablespace
+    if (tableSpace.getProperty().isArbitraryPathAllowed()) {
+      URI rootUri = tableSpace.getRootUri();
+      // if there already exists or the rootUri is 'file:/', it won't overwrite the tablespace.
+      if (!TABLE_SPACES.containsKey(rootUri) && !rootUri.toString().startsWith(LOCAL_FS_URI.toString())) {
+        String tmpName = UUID.randomUUID().toString();
+        registerTableSpace(tmpName, rootUri, spaceDesc, false, override);
+      }
+    }
+  }
+
+  private static void putTablespace(Tablespace space, boolean override) {
+    // It is a device to keep the relationship among name, URI, and tablespace 1:1:1.
+
+    boolean nameExist = SPACES_URIS_MAP.containsKey(space.getName());
+    boolean uriExist = TABLE_SPACES.containsKey(space.uri);
+
+    boolean mismatch = nameExist && !SPACES_URIS_MAP.get(space.getName()).equals(space.getUri());
+    mismatch = mismatch || uriExist && TABLE_SPACES.get(space.uri).equals(space);
+
+    if (!override && mismatch) {
+      throw new RuntimeException("Name or URI of Tablespace must be unique.");
+    }
+
+    SPACES_URIS_MAP.put(space.getName(), space.getUri());
+    // We must guarantee that the same uri results in the same tablespace instance.
+    TABLE_SPACES.put(space.getUri(), space);
   }
 
   /**
-   * Creates a scanner instance.
+   * Return length of the fragment.
+   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
    *
-   * @param theClass Concrete class of scanner
-   * @param conf System property
-   * @param schema Input schema
-   * @param meta Table meta data
-   * @param fragment The fragment for scanning
-   * @param <T>
-   * @return The scanner instance
+   * @param conf Tajo system property
+   * @param fragment Fragment
+   * @return
    */
-  public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta,
-                                         Fragment fragment) {
-    T result;
+  public static long guessFragmentVolume(TajoConf conf, Fragment fragment) {
+    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
+      return conf.getLongVar(TajoConf.ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
+    } else {
+      return fragment.getLength();
+    }
+  }
+
+  public static final String KEY_STORAGES = "storages"; // storages
+  public static final String KEY_STORAGE_HANDLER = "handler"; // storages/?/handler
+  public static final String KEY_STORAGE_DEFAULT_FORMAT = "default-format"; // storages/?/default-format
+
+  public static final String KEY_SPACES = "spaces";
+
+  private static Tablespace initializeTableSpace(String spaceName, URI uri, boolean visible) {
+    Preconditions.checkNotNull(uri.getScheme(), "URI must include scheme, but it was " + uri);
+    Class<? extends Tablespace> clazz = TABLE_SPACE_HANDLERS.get(uri.getScheme());
+
+    if (clazz == null) {
+      throw new RuntimeException("There is no tablespace for " + uri.toString());
+    }
+
     try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
+      Constructor<? extends Tablespace> constructor =
+          (Constructor<? extends Tablespace>) CONSTRUCTORS.get(clazz);
+
+      if (constructor == null) {
+        constructor = clazz.getDeclaredConstructor(TABLESPACE_PARAM);
+        constructor.setAccessible(true);
+        CONSTRUCTORS.put(clazz, constructor);
       }
-      result = meth.newInstance(new Object[]{conf, schema, meta, fragment});
+
+      return constructor.newInstance(new Object[]{spaceName, uri});
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
+  }
+
+  @VisibleForTesting
+  public static Optional<Tablespace> addTableSpaceForTest(Tablespace space) {
+    Tablespace existing;
+    synchronized (SPACES_URIS_MAP) {
+      // Remove existing one
+      SPACES_URIS_MAP.remove(space.getName());
+      existing = TABLE_SPACES.remove(space.getUri());
+
+      // Add anotherone for test
+      registerTableSpace(space.name, space.uri, null, true, true);
+    }
+    // if there is an existing one, return it.
+    return Optional.fromNullable(existing);
+  }
 
-    return result;
+  public Iterable<String> getSupportSchemes() {
+    return TABLE_SPACE_HANDLERS.keySet();
   }
 
   /**
-   * Creates a scanner instance.
+   * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
    *
-   * @param theClass Concrete class of scanner
-   * @param conf System property
-   * @param taskAttemptId Task id
-   * @param meta Table meta data
-   * @param schema Input schema
-   * @param workDir Working directory
-   * @param <T>
-   * @return The scanner instance
+   * @param uri Table or Table Fragment URI.
+   * @param <T> Tablespace class type
+   * @return Tablespace. If uri is null, the default tablespace will be returned.
    */
-  public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TaskAttemptId taskAttemptId,
-                                          TableMeta meta, Schema schema, Path workDir) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
+  public static <T extends Tablespace> Optional<T> get(@Nullable String uri) {
+
+    if (uri == null || uri.isEmpty()) {
+      return (Optional<T>) Optional.of(getDefault());
+    }
+
+    Tablespace lastOne = null;
+
+    // Find the longest matched one. For example, assume that the caller tries to find /x/y/z, and
+    // there are /x and /x/y. In this case, /x/y will be chosen because it is more specific.
+    for (Map.Entry<URI, Tablespace> entry: TABLE_SPACES.headMap(URI.create(uri), true).entrySet()) {
+      if (uri.startsWith(entry.getKey().toString())) {
+        lastOne = entry.getValue();
       }
-      result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
     }
+    return (Optional<T>) Optional.fromNullable(lastOne);
+  }
+
+  /**
+   * Get tablespace for the given URI. If uri is null, the default tablespace will be returned
+   *
+   * @param uri Table or Table Fragment URI.
+   * @param <T> Tablespace class type
+   * @return Tablespace. If uri is null, the default tablespace will be returned.
+   */
+  public static <T extends Tablespace> Optional<T> get(@Nullable URI uri) {
+    if (uri == null) {
+      return (Optional<T>) Optional.of(getDefault());
+    } else {
+      return (Optional<T>) get(uri.toString());
+    }
+  }
+
+  /**
+   * It returns the default tablespace. This method ensures that it always return the tablespace.
+   *
+   * @return
+   */
+  public static <T extends Tablespace> T getDefault() {
+    return (T) getByName(DEFAULT_TABLESPACE_NAME).get();
+  }
+
+  public static <T extends Tablespace> T getLocalFs() {
+    return (T) get(LOCAL_FS_URI).get();
+  }
+
+  public static Optional<? extends Tablespace> getByName(String name) {
+    URI uri = SPACES_URIS_MAP.get(name);
+    if (uri != null) {
+      return Optional.of(TABLE_SPACES.get(uri));
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  public static Optional<? extends Tablespace> getAnyByScheme(String scheme) {
+    for (Map.Entry<URI, Tablespace> entry : TABLE_SPACES.entrySet()) {
+      String uriScheme = entry.getKey().getScheme();
+      if (uriScheme != null && uriScheme.equalsIgnoreCase(scheme)) {
+        return Optional.of(entry.getValue());
+      }
+    }
+
+    return Optional.absent();
+  }
+
+  @Override
+  public URI getTableURI(@Nullable String spaceName, String databaseName, String tableName) {
+    Tablespace space = spaceName == null ? getDefault() : getByName(spaceName).get();
+    return space.getTableUri(databaseName, tableName);
+  }
 
-    return result;
+  public static Iterable<Tablespace> getAllTablespaces() {
+    return TABLE_SPACES.values();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
index 0626da8..77c5d05 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Tablespace.java
@@ -19,10 +19,8 @@
 package org.apache.tajo.storage;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.OverridableConf;
-import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
@@ -30,16 +28,20 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 
 import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Tablespace manages the functions of storing and reading data.
@@ -49,18 +51,24 @@ import java.util.List;
  */
 public abstract class Tablespace {
 
-  public static final PathFilter hiddenFileFilter = new PathFilter() {
-    public boolean accept(Path p) {
-      String name = p.getName();
-      return !name.startsWith("_") && !name.startsWith(".");
-    }
-  };
+  protected final String name;
+  protected final URI uri;
+  /** this space is visible or not. */
+  protected boolean visible = true;
 
   protected TajoConf conf;
-  protected String storeType;
 
-  public Tablespace(String storeType) {
-    this.storeType = storeType;
+  public Tablespace(String name, URI uri) {
+    this.name = name;
+    this.uri = uri;
+  }
+
+  public void setVisible(boolean visible) {
+    this.visible = visible;
+  }
+
+  public Set<String> getDependencies() {
+    return Collections.emptySet();
   }
 
   /**
@@ -69,24 +77,47 @@ public abstract class Tablespace {
    */
   protected abstract void storageInit() throws IOException;
 
+  public String getName() {
+    return name;
+  }
+
+  public URI getUri() {
+    return uri;
+  }
+
+  public boolean isVisible() {
+    return visible;
+  }
+
+  public abstract void setConfig(String name, String value);
+
+  public abstract void setConfigs(Map<String, String> configs);
+
+  public String toString() {
+    return name + "=" + uri.toString();
+  }
+
+  public abstract long getTableVolume(URI uri) throws IOException;
+
   /**
-   * This method is called after executing "CREATE TABLE" statement.
-   * If a storage is a file based storage, a storage manager may create directory.
+   * if {@link StorageProperty#isArbitraryPathAllowed} is true,
+   * the storage allows arbitrary path accesses. In this case, the storage must provide the root URI.
    *
-   * @param tableDesc Table description which is created.
-   * @param ifNotExists Creates the table only when the table does not exist.
-   * @throws java.io.IOException
+   * @see {@link StorageProperty#isArbitraryPathAllowed}
+   * @return Root URI
    */
-  public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
+  public URI getRootUri() {
+    throw new UnsupportedException(
+        String.format("Tablespace '%s' does not allow the use of artibrary paths", uri.toString()));
+  }
 
   /**
-   * This method is called after executing "DROP TABLE" statement with the 'PURGE' option
-   * which is the option to delete all the data.
+   * Get Table URI
    *
-   * @param tableDesc
-   * @throws java.io.IOException
+   * @param tableName
+   * @return
    */
-  public abstract void purgeTable(TableDesc tableDesc) throws IOException;
+  public abstract URI getTableUri(String databaseName, String tableName);
 
   /**
    * Returns the splits that will serve as input for the scan tasks. The
@@ -116,7 +147,9 @@ public abstract class Tablespace {
    * It returns the storage property.
    * @return The storage property
    */
-  public abstract StorageProperty getStorageProperty();
+  public abstract StorageProperty getProperty();
+
+  public abstract FormatProperty getFormatProperty(String dataFormat);
 
   /**
    * Release storage manager resource
@@ -137,21 +170,10 @@ public abstract class Tablespace {
    * @throws java.io.IOException
    */
   public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
-                                                   Schema inputSchema, SortSpec[] sortSpecs,
+                                                   Schema inputSchema, SortSpec [] sortSpecs,
                                                    TupleRange dataRange) throws IOException;
 
   /**
-   * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
-   * In general Tajo creates the target table after finishing the final sub-query of CATS.
-   * But In the special cases, such as HBase INSERT or CAST query uses the target table information.
-   * That kind of the storage should implements the logic related to creating table in this method.
-   *
-   * @param node The child node of the root node.
-   * @throws java.io.IOException
-   */
-  public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException;
-
-  /**
    * It is called when the query failed.
    * Each storage manager should implement to be processed when the query fails in this method.
    *
@@ -160,21 +182,13 @@ public abstract class Tablespace {
    */
 
   /**
-   * Returns the current storage type.
-   * @return
-   */
-  public String getStoreType() {
-    return storeType;
-  }
-
-  /**
    * Initialize Tablespace instance. It should be called before using.
    *
    * @param tajoConf
    * @throws java.io.IOException
    */
   public void init(TajoConf tajoConf) throws IOException {
-    this.conf = tajoConf;
+    this.conf = new TajoConf(tajoConf);
     storageInit();
   }
 
@@ -239,7 +253,7 @@ public abstract class Tablespace {
     Scanner scanner;
 
     Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
-    scanner = TableSpaceManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
+    scanner = OldStorageManager.newScannerInstance(scannerClass, conf, schema, meta, fragment);
     scanner.setTarget(target.toArray());
 
     return scanner;
@@ -263,18 +277,18 @@ public abstract class Tablespace {
     Class<? extends Appender> appenderClass;
 
     String handlerName = meta.getStoreType().toLowerCase();
-    appenderClass = TableSpaceManager.APPENDER_HANDLER_CACHE.get(handlerName);
+    appenderClass = OldStorageManager.APPENDER_HANDLER_CACHE.get(handlerName);
     if (appenderClass == null) {
       appenderClass = conf.getClass(
           String.format("tajo.storage.appender-handler.%s.class", handlerName), null, Appender.class);
-      TableSpaceManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+      OldStorageManager.APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
     }
 
     if (appenderClass == null) {
       throw new IOException("Unknown Storage Type: " + meta.getStoreType());
     }
 
-    appender = TableSpaceManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
+    appender = OldStorageManager.newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir);
 
     return appender;
   }
@@ -288,11 +302,11 @@ public abstract class Tablespace {
    */
   public Class<? extends Scanner> getScannerClass(String storeType) throws IOException {
     String handlerName = storeType.toLowerCase();
-    Class<? extends Scanner> scannerClass = TableSpaceManager.SCANNER_HANDLER_CACHE.get(handlerName);
+    Class<? extends Scanner> scannerClass = OldStorageManager.SCANNER_HANDLER_CACHE.get(handlerName);
     if (scannerClass == null) {
       scannerClass = conf.getClass(
           String.format("tajo.storage.scanner-handler.%s.class", handlerName), null, Scanner.class);
-      TableSpaceManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
+      OldStorageManager.SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
     }
 
     if (scannerClass == null) {
@@ -303,43 +317,54 @@ public abstract class Tablespace {
   }
 
   /**
-   * Return length of the fragment.
-   * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration.
+   * It is called after making logical plan. Storage manager should verify the schema for inserting.
    *
-   * @param conf Tajo system property
-   * @param fragment Fragment
-   * @return
+   * @param tableDesc The table description of insert target.
+   * @param outSchema  The output schema of select query for inserting.
+   * @throws java.io.IOException
    */
-  public static long getFragmentLength(TajoConf conf, Fragment fragment) {
-    if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) {
-      return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH);
-    } else {
-      return fragment.getLength();
-    }
+  public abstract void verifySchemaToWrite(TableDesc tableDesc, Schema outSchema) throws IOException;
+
+  /**
+   * Rewrite the logical plan. It is assumed that the final plan will be given in this method.
+   */
+  public void rewritePlan(OverridableConf context, LogicalPlan plan) throws PlanningException {
+    // nothing to do by default
   }
 
-  public abstract void rollbackOutputCommit(LogicalNode node) throws IOException;
+  ////////////////////////////////////////////////////////////////////////////
+  // Table Lifecycle Section
+  ////////////////////////////////////////////////////////////////////////////
 
   /**
-   * It is called after making logical plan. Storage manager should verify the schema for inserting.
+   * This method is called after executing "CREATE TABLE" statement.
+   * If a storage is a file based storage, a storage manager may create directory.
    *
-   * @param tableDesc The table description of insert target.
-   * @param outSchema  The output schema of select query for inserting.
+   * @param tableDesc Table description which is created.
+   * @param ifNotExists Creates the table only when the table does not exist.
    * @throws java.io.IOException
    */
-  public abstract void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException;
+  public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException;
 
   /**
-   * Returns the list of storage specified rewrite rules.
-   * This values are used by LogicalOptimizer.
+   * This method is called after executing "DROP TABLE" statement with the 'PURGE' option
+   * which is the option to delete all the data.
    *
-   * @param queryContext The query property
-   * @param tableDesc The description of the target table.
-   * @return The list of storage specified rewrite rules
+   * @param tableDesc
    * @throws java.io.IOException
    */
-  public abstract List<LogicalPlanRewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc)
-      throws IOException;
+  public abstract void purgeTable(TableDesc tableDesc) throws IOException;
+
+  /**
+   * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'.
+   * In general Tajo creates the target table after finishing the final sub-query of CATS.
+   * But In the special cases, such as HBase INSERT or CAST query uses the target table information.
+   * That kind of the storage should implements the logic related to creating table in this method.
+   *
+   * @param node The child node of the root node.
+   * @throws java.io.IOException
+   */
+  public abstract void prepareTable(LogicalNode node) throws IOException;
 
   /**
    * Finalizes result data. Tajo stores result data in the staging directory.
@@ -354,7 +379,20 @@ public abstract class Tablespace {
    * @return Saved path
    * @throws java.io.IOException
    */
-  public abstract Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId,
-                               LogicalPlan plan, Schema schema,
-                               TableDesc tableDesc) throws IOException;
+  public abstract Path commitTable(OverridableConf queryContext,
+                                   ExecutionBlockId finalEbId,
+                                   LogicalPlan plan, Schema schema,
+                                   TableDesc tableDesc) throws IOException;
+
+  public abstract void rollbackTable(LogicalNode node) throws IOException;
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof Tablespace) {
+      Tablespace other = (Tablespace) obj;
+      return name.equals(other.name) && uri.equals(other.uri);
+    } else {
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
new file mode 100644
index 0000000..40e17f4
--- /dev/null
+++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.json
@@ -0,0 +1,20 @@
+{
+  "storages": {
+    "hdfs": {
+      "handler": "org.apache.tajo.storage.FileTablespace",
+      "default-format": "text"
+    },
+    "file": {
+      "handler": "org.apache.tajo.storage.FileTablespace",
+      "default-format": "text"
+    },
+    "s3": {
+      "handler": "org.apache.tajo.storage.FileTablespace",
+      "default-format": "text"
+    },
+    "hbase": {
+      "handler": "org.apache.tajo.storage.hbase.HBaseTablespace",
+      "default-format": "hbase"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/pom.xml b/tajo-storage/tajo-storage-hbase/pom.xml
index 3456b76..5a1dc9a 100644
--- a/tajo-storage/tajo-storage-hbase/pom.xml
+++ b/tajo-storage/tajo-storage-hbase/pom.xml
@@ -61,6 +61,12 @@
       <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/main/resources/*.json</exclude>
+            <exclude>src/test/resources/*.json</exclude>
+          </excludes>
+        </configuration>
         <executions>
           <execution>
             <phase>verify</phase>
@@ -182,6 +188,11 @@
       <artifactId>tajo-storage-common</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-hdfs</artifactId>
+      <scope>provided</scope>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
index 425f392..0fc2922 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -89,7 +89,7 @@ public abstract class AbstractHBaseAppender implements Appender {
     if (enabledStats) {
       stats = new TableStatistics(this.schema);
     }
-    columnMapping = new ColumnMapping(schema, meta);
+    columnMapping = new ColumnMapping(schema, meta.getOptions());
 
     mappingColumnFamilies = columnMapping.getMappingColumns();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
deleted file mode 100644
index 32f1e43..0000000
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.tajo.OverridableConf;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.LogicalRootNode;
-import org.apache.tajo.plan.logical.SortNode;
-import org.apache.tajo.plan.logical.SortNode.SortPurpose;
-import org.apache.tajo.plan.logical.UnaryNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
-
-public class AddSortForInsertRewriter implements LogicalPlanRewriteRule {
-  private int[] sortColumnIndexes;
-  private Column[] sortColumns;
-
-  public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) {
-    this.sortColumns = sortColumns;
-    this.sortColumnIndexes = new int[sortColumns.length];
-
-    Schema tableSchema = tableDesc.getSchema();
-    for (int i = 0; i < sortColumns.length; i++) {
-      sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
-    }
-  }
-
-  @Override
-  public String getName() {
-    return "AddSortForInsertRewriter";
-  }
-
-  @Override
-  public boolean isEligible(OverridableConf queryContext, LogicalPlan plan) {
-    String storeType = PlannerUtil.getStoreType(plan);
-    return storeType != null;
-  }
-
-  @Override
-  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException {
-    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-    UnaryNode insertNode = rootNode.getChild();
-    LogicalNode childNode = insertNode.getChild();
-
-    Schema sortSchema = childNode.getOutSchema();
-    SortNode sortNode = plan.createNode(SortNode.class);
-    sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
-    sortNode.setInSchema(sortSchema);
-    sortNode.setOutSchema(sortSchema);
-
-    SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
-    int index = 0;
-
-    for (int i = 0; i < sortColumnIndexes.length; i++) {
-      Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
-      if (sortColumn == null) {
-        throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
-      }
-      sortSpecs[index++] = new SortSpec(sortColumn, true, true);
-    }
-    sortNode.setSortSpecs(sortSpecs);
-
-    sortNode.setChild(insertNode.getChild());
-    insertNode.setChild(sortNode);
-    plan.getRootBlock().registerNode(sortNode);
-
-    return plan;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
index e66a707..0314e8e 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
@@ -20,18 +20,18 @@ package org.apache.tajo.storage.hbase;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.util.BytesUtils;
+import org.apache.tajo.util.KeyValueSet;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
 public class ColumnMapping {
-  private TableMeta tableMeta;
   private Schema schema;
-  private char rowKeyDelimiter;
+  private KeyValueSet tableProperty;
 
+  private char rowKeyDelimiter;
   private String hbaseTableName;
 
   private int[] rowKeyFieldIndexes;
@@ -45,16 +45,15 @@ public class ColumnMapping {
 
   private int numRowKeys;
 
-  public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException {
+  public ColumnMapping(Schema schema, KeyValueSet tableProperty) throws IOException{
     this.schema = schema;
-    this.tableMeta = tableMeta;
-
+    this.tableProperty = tableProperty;
     init();
   }
 
   public void init() throws IOException {
-    hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY);
-    String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim();
+    hbaseTableName = tableProperty.get(HBaseStorageConstants.META_TABLE_KEY);
+    String delim = tableProperty.get(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim();
     if (delim.length() > 0) {
       rowKeyDelimiter = delim.charAt(0);
     }
@@ -70,7 +69,7 @@ public class ColumnMapping {
       rowKeyFieldIndexes[i] = -1;
     }
 
-    String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, "");
+    String columnMapping = tableProperty.get(HBaseStorageConstants.META_COLUMNS_KEY, "");
     if (columnMapping == null || columnMapping.isEmpty()) {
       throw new IOException("'columns' property is required.");
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
index 9ea0bf6..5961751 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
@@ -29,8 +29,12 @@ import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.hbase.StorageFragmentProtos.*;
 
+import java.net.URI;
+
 public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable {
   @Expose
+  private URI uri;
+  @Expose
   private String tableName;
   @Expose
   private String hbaseTableName;
@@ -45,7 +49,9 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
   @Expose
   private long length;
 
-  public HBaseFragment(String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, String regionLocation) {
+  public HBaseFragment(URI uri, String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow,
+                       String regionLocation) {
+    this.uri = uri;
     this.tableName = tableName;
     this.hbaseTableName = hbaseTableName;
     this.startRow = startRow;
@@ -62,6 +68,7 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
   }
 
   private void init(HBaseFragmentProto proto) {
+    this.uri = URI.create(proto.getUri());
     this.tableName = proto.getTableName();
     this.hbaseTableName = proto.getHbaseTableName();
     this.startRow = proto.getStartRow().toByteArray();
@@ -76,6 +83,10 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
     return Bytes.compareTo(startRow, t.startRow);
   }
 
+  public URI getUri() {
+    return uri;
+  }
+
   @Override
   public String getTableName() {
     return tableName;
@@ -107,6 +118,7 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
 
   public Object clone() throws CloneNotSupportedException {
     HBaseFragment frag = (HBaseFragment) super.clone();
+    frag.uri = uri;
     frag.tableName = tableName;
     frag.hbaseTableName = hbaseTableName;
     frag.startRow = startRow;
@@ -137,16 +149,20 @@ public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Clone
 
   @Override
   public String toString() {
-    return "\"fragment\": {\"tableName\": \""+ tableName + "\", hbaseTableName\": \"" + hbaseTableName + "\"" +
-        ", \"startRow\": \"" + new String(startRow) + "\"" +
-        ", \"stopRow\": \"" + new String(stopRow) + "\"" +
-        ", \"length\": \"" + length + "\"}" ;
+    return
+        "\"fragment\": {\"uri:\"" + uri.toString() +"\", \"tableName\": \""+ tableName +
+            "\", hbaseTableName\": \"" + hbaseTableName + "\"" +
+            ", \"startRow\": \"" + new String(startRow) + "\"" +
+            ", \"stopRow\": \"" + new String(stopRow) + "\"" +
+            ", \"length\": \"" + length + "\"}" ;
   }
 
   @Override
   public FragmentProto getProto() {
     HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
-    builder.setTableName(tableName)
+    builder
+        .setUri(uri.toString())
+        .setTableName(tableName)
         .setHbaseTableName(hbaseTableName)
         .setStartRow(ByteString.copyFrom(startRow))
         .setStopRow(ByteString.copyFrom(stopRow))

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 19fdf80..916aae7 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -26,28 +26,29 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.storage.TableSpaceManager;
 import org.apache.tajo.storage.Tuple;
 
 import java.io.IOException;
+import java.net.URI;
 
 public class HBasePutAppender extends AbstractHBaseAppender {
+  private URI uri;
   private HTableInterface htable;
   private long totalNumBytes;
 
-  public HBasePutAppender(Configuration conf, TaskAttemptId taskAttemptId,
+  public HBasePutAppender(Configuration conf, URI uri, TaskAttemptId taskAttemptId,
                           Schema schema, TableMeta meta, Path stagingDir) {
     super(conf, taskAttemptId, schema, meta, stagingDir);
+    this.uri = uri;
   }
 
   @Override
   public void init() throws IOException {
     super.init();
 
-    Configuration hbaseConf = HBaseTablespace.getHBaseConfiguration(conf, meta);
-    HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager((TajoConf) conf, "HBASE"))
-        .getConnection(hbaseConf);
+    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(uri).get();
+    HConnection hconn = space.getConnection();
     htable = hconn.getTable(columnMapping.getHbaseTableName());
     htable.setAutoFlushTo(false);
     htable.setWriteBufferSize(5 * 1024 * 1024);

http://git-wip-us.apache.org/repos/asf/tajo/blob/6cc7e5c6/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
index 992c13c..16f4c14 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseScanner.java
@@ -117,7 +117,7 @@ public class HBaseScanner implements Scanner {
       targets = schema.toArray();
     }
 
-    columnMapping = new ColumnMapping(schema, meta);
+    columnMapping = new ColumnMapping(schema, meta.getOptions());
     targetIndexes = new int[targets.length];
     int index = 0;
     for (Column eachTargetColumn: targets) {
@@ -133,8 +133,8 @@ public class HBaseScanner implements Scanner {
     rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
     rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
 
-    hbaseConf = HBaseTablespace.getHBaseConfiguration(conf, meta);
-
+    HBaseTablespace space = (HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get();
+    hbaseConf = space.getHbaseConf();
     initScanner();
   }
 
@@ -181,8 +181,7 @@ public class HBaseScanner implements Scanner {
     }
 
     if (htable == null) {
-      HConnection hconn = ((HBaseTablespace) TableSpaceManager.getStorageManager(conf, "HBASE"))
-          .getConnection(hbaseConf);
+      HConnection hconn = ((HBaseTablespace) TableSpaceManager.get(fragment.getUri()).get()).getConnection();
       htable = hconn.getTable(fragment.getHbaseTableName());
     }
     scanner = htable.getScanner(scan);