You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/03/25 02:36:25 UTC

[10/13] TAJO-353: Add Database support to Tajo. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
index e384194..974ded9 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -31,21 +31,34 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class DerbyStore extends AbstractDBStore {
+
+  /** 2014-03-20: First versioning */
+  private static final int DERBY_STORE_VERSION_2 = 2;
+  /** Before 2013-03-20 */
+  private static final int DERBY_STORE_VERSION_1 = 1;
   private static final String CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver";
 
   protected String getCatalogDriverName(){
     return CATALOG_DRIVER;
   }
 
-  public DerbyStore(final Configuration conf)
-      throws InternalException {
+  public DerbyStore(final Configuration conf) throws InternalException {
     super(conf);
   }
 
+  public int getDriverVersion() {
+    return DERBY_STORE_VERSION_2;
+  }
+
   protected Connection createConnection(Configuration conf) throws SQLException {
     return DriverManager.getConnection(getCatalogUri());
   }
 
+  @Override
+  public String readSchemaFile(String filename) throws CatalogException {
+    return super.readSchemaFile("derby/" + filename);
+  }
+
   // TODO - DDL and index statements should be renamed
   protected void createBaseTable() throws CatalogException {
     Connection conn = null;
@@ -55,62 +68,79 @@ public class DerbyStore extends AbstractDBStore {
       conn = getConnection();
       stmt = conn.createStatement();
 
-      StringBuilder sql = new StringBuilder();
-
       //META
       if (!baseTableMaps.get(TB_META)) {
-        sql.append("CREATE TABLE ");
-        sql.append(TB_META);
-        sql.append(" (version int NOT NULL)");
+
+        String sql = super.readSchemaFile("common/meta.sql");
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.executeUpdate(sql.toString());
-        LOG.info("Table '" + TB_META + " is created.");
+
+        stmt.executeUpdate(sql);
+
+        LOG.info("Table '" + TB_META + "' is created.");
         baseTableMaps.put(TB_META, true);
       }
 
+      // TABLE SPACES
+      if (!baseTableMaps.get(TB_SPACES)) {
+
+        String sql = readSchemaFile("tablespaces.sql");
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+
+        stmt.executeUpdate(sql);
+
+        LOG.info("Table '" + TB_SPACES + "' is created.");
+        baseTableMaps.put(TB_SPACES, true);
+      }
+
+      // DATABASES
+      if (!baseTableMaps.get(TB_DATABASES)) {
+        String sql = readSchemaFile("databases.sql");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        stmt.addBatch(sql);
+
+        sql = readSchemaFile("databases_idx.sql");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        stmt.addBatch(sql);
+
+        stmt.executeBatch();
+        LOG.info("Table '" + TB_DATABASES + "' is created.");
+        baseTableMaps.put(TB_DATABASES, true);
+      }
+
       // TABLES
       if (!baseTableMaps.get(TB_TABLES)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ");
-        sql.append(TB_TABLES);
-        sql.append(" (");
-        sql.append("TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),");
-        sql.append(C_TABLE_ID);
-        sql.append(" VARCHAR(255) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, ");
-        sql.append("path VARCHAR(1024), ");
-        sql.append("store_type CHAR(16), ");
-        sql.append("CONSTRAINT TABLES_PK PRIMARY KEY (TID)");
-        sql.append( ")");
 
+        String sql = readSchemaFile("tables.sql");
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
 
-        sql.delete(0, sql.length());
-        sql.append("CREATE UNIQUE INDEX idx_tables_tid on ");
-        sql.append(TB_TABLES);
-        sql.append(" (TID)");
 
+        sql = "CREATE UNIQUE INDEX idx_tables_tid on TABLES (TID)";
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
 
-        sql.delete(0, sql.length());
-        sql.append("CREATE UNIQUE INDEX idx_tables_name on ");
-        sql.append(TB_TABLES);
-        sql.append("(");
-        sql.append(C_TABLE_ID);
-        sql.append(")");
 
+
+        sql = "CREATE UNIQUE INDEX idx_tables_name on TABLES (TABLE_NAME)";
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
+
         stmt.executeBatch();
         LOG.info("Table '" + TB_TABLES + "' is created.");
         baseTableMaps.put(TB_TABLES, true);
@@ -118,42 +148,19 @@ public class DerbyStore extends AbstractDBStore {
 
       // COLUMNS
       if (!baseTableMaps.get(TB_COLUMNS)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ");
-        sql.append(TB_COLUMNS);
-        sql.append(" (");
-        sql.append("TID INT NOT NULL REFERENCES ");
-        sql.append(TB_TABLES);
-        sql.append(" (TID) ON DELETE CASCADE, ");
-        sql.append(C_TABLE_ID);
-        sql.append( " VARCHAR(255) NOT NULL REFERENCES ");
-        sql.append(TB_TABLES);
-        sql.append("(");
-        sql.append(C_TABLE_ID);
-        sql.append(") ON DELETE CASCADE, ");
-        sql.append("column_id INT NOT NULL,");
-        sql.append("column_name VARCHAR(255) NOT NULL, ");
-        sql.append("data_type CHAR(16), type_length INTEGER, ");
-        sql.append("CONSTRAINT C_COLUMN_ID UNIQUE (");
-        sql.append(C_TABLE_ID);
-        sql.append(", column_name))");
+        String sql = readSchemaFile("columns.sql");
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
 
-        sql.delete(0, sql.length());
-        sql.append( "CREATE UNIQUE INDEX idx_fk_columns_table_name on ");
-        sql.append(TB_COLUMNS);
-        sql.append("(");
-        sql.append(C_TABLE_ID);
-        sql.append(", column_name)");
+        sql = "CREATE UNIQUE INDEX idx_fk_columns_table_name on " + TB_COLUMNS + "(TID, COLUMN_NAME)";
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
         stmt.executeBatch();
         LOG.info("Table '" + TB_COLUMNS + " is created.");
         baseTableMaps.put(TB_COLUMNS, true);
@@ -161,104 +168,68 @@ public class DerbyStore extends AbstractDBStore {
 
       // OPTIONS
       if (!baseTableMaps.get(TB_OPTIONS)) {
-        sql.delete(0, sql.length());
-        sql.append( "CREATE TABLE ");
-        sql.append(TB_OPTIONS);
-        sql.append(" (").append(C_TABLE_ID);
-        sql.append(" VARCHAR(255) NOT NULL REFERENCES TABLES (");
-        sql.append(C_TABLE_ID).append(") ON DELETE CASCADE, ");
-        sql.append("key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL)");
+        String sql = readSchemaFile("table_properties.sql");
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
 
-        sql.delete(0, sql.length());
-        sql.append("CREATE INDEX idx_options_key on ");
-        sql.append(TB_OPTIONS).append( " (").append(C_TABLE_ID).append(")");
 
+        sql = "CREATE INDEX idx_options_key on " + TB_OPTIONS + "(TID)";
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
 
-        sql.delete(0, sql.length());
-        sql.append("CREATE INDEX idx_options_table_name on ").append(TB_OPTIONS);
-        sql.append("(" ).append(C_TABLE_ID).append(")");
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
-        }
-        stmt.addBatch(sql.toString());
         stmt.executeBatch();
+
         LOG.info("Table '" + TB_OPTIONS + " is created.");
         baseTableMaps.put(TB_OPTIONS, true);
       }
 
       // INDEXES
       if (!baseTableMaps.get(TB_INDEXES)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_INDEXES).append("(");
-        sql.append( "index_name VARCHAR(255) NOT NULL PRIMARY KEY, ");
-        sql.append(C_TABLE_ID).append(" VARCHAR(255) NOT NULL REFERENCES TABLES (");
-        sql.append(C_TABLE_ID).append(") ");
-        sql.append("ON DELETE CASCADE, ");
-        sql.append("column_name VARCHAR(255) NOT NULL, ");
-        sql.append("data_type VARCHAR(255) NOT NULL, ");
-        sql.append("index_type CHAR(32) NOT NULL, ");
-        sql.append("is_unique BOOLEAN NOT NULL, ");
-        sql.append("is_clustered BOOLEAN NOT NULL, ");
-        sql.append("is_ascending BOOLEAN NOT NULL)");
+        String sql = readSchemaFile("indexes.sql");
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
 
-        sql.delete(0, sql.length());
-        sql.append("CREATE UNIQUE INDEX idx_indexes_key ON ");
-        sql.append(TB_INDEXES).append(" (index_name)");
+        sql = "CREATE UNIQUE INDEX idx_indexes_pk ON " + TB_INDEXES + "(" + COL_DATABASES_PK + ",index_name)";
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
 
-        sql.delete(0, sql.length());
-        sql.append("CREATE INDEX idx_indexes_columns ON ");
-        sql.append(TB_INDEXES).append(" (").append(C_TABLE_ID).append(", column_name)");
+        sql = "CREATE INDEX idx_indexes_columns ON " + TB_INDEXES + "(" + COL_DATABASES_PK + ",column_name)";
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
         stmt.executeBatch();
         LOG.info("Table '" + TB_INDEXES + "' is created.");
         baseTableMaps.put(TB_INDEXES, true);
       }
 
       if (!baseTableMaps.get(TB_STATISTICS)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_STATISTICS).append( "(");
-        sql.append(C_TABLE_ID).append(" VARCHAR(255) NOT NULL REFERENCES TABLES (");
-        sql.append(C_TABLE_ID).append(") ");
-        sql.append("ON DELETE CASCADE, ");
-        sql.append("num_rows BIGINT, ");
-        sql.append("num_bytes BIGINT)");
+        String sql = readSchemaFile("stats.sql");
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
+
 
-        sql.delete(0, sql.length());
-        sql.append("CREATE INDEX idx_stats_table_name ON ");
-        sql.append(TB_STATISTICS).append(" (").append(C_TABLE_ID).append(")");
+        sql = "CREATE UNIQUE INDEX idx_stats_table_name ON " + TB_STATISTICS + "(TID)";
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
         stmt.executeBatch();
         LOG.info("Table '" + TB_STATISTICS + "' is created.");
         baseTableMaps.put(TB_STATISTICS, true);
@@ -266,28 +237,19 @@ public class DerbyStore extends AbstractDBStore {
 
       // PARTITION_METHODS
       if (!baseTableMaps.get(TB_PARTITION_METHODS)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_PARTITION_METHODS).append(" (");
-        sql.append(C_TABLE_ID).append(" VARCHAR(255) NOT NULL REFERENCES TABLES (");
-        sql.append(C_TABLE_ID).append(") ");
-        sql.append("ON DELETE CASCADE, ");
-        sql.append("partition_type VARCHAR(10) NOT NULL,");
-        sql.append("expression VARCHAR(1024) NOT NULL,");
-        sql.append("expression_schema VARCHAR(1024) FOR BIT DATA NOT NULL)");
+        String sql = readSchemaFile("partition_methods.sql");
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
 
-        sql.delete(0, sql.length());
-        sql.append("CREATE INDEX idx_partition_methods_table_name ON ");
-        sql.append(TB_PARTITION_METHODS).append(" (").append(C_TABLE_ID).append(")");
 
+        sql = "CREATE INDEX idx_partition_methods_table_id ON " + TB_PARTITION_METHODS + "(TID)";
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
         stmt.executeBatch();
         LOG.info("Table '" + TB_PARTITION_METHODS + "' is created.");
         baseTableMaps.put(TB_PARTITION_METHODS, true);
@@ -295,67 +257,54 @@ public class DerbyStore extends AbstractDBStore {
 
       // PARTITIONS
       if (!baseTableMaps.get(TB_PARTTIONS)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_PARTTIONS).append("(");
-        sql.append("PID INT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),");
-        sql.append(C_TABLE_ID).append(" VARCHAR(255) NOT NULL REFERENCES TABLES (");
-        sql.append(C_TABLE_ID).append(")");
-        sql.append("ON DELETE CASCADE, ");
-        sql.append("partition_name VARCHAR(255), ");
-        sql.append("ordinal_position INT NOT NULL,");
-        sql.append("partition_value VARCHAR(1024),");
-        sql.append("path VARCHAR(1024),");
-        sql.append("cache_nodes VARCHAR(255), ");
-        sql.append(" CONSTRAINT PARTITION_PK PRIMARY KEY (PID))");
+        String sql = readSchemaFile("partitions.sql");
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
+
 
-        sql.delete(0, sql.length());
-        sql.append("CREATE INDEX idx_partitions_table_name ON ");
-        sql.append(TB_PARTTIONS).append(" (").append(C_TABLE_ID).append(")");
+        sql = "CREATE INDEX idx_partitions_table_name ON PARTITIONS(TID)";
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
-        stmt.addBatch(sql.toString());
+        stmt.addBatch(sql);
         stmt.executeBatch();
         LOG.info("Table '" + TB_PARTTIONS + "' is created.");
         baseTableMaps.put(TB_PARTTIONS, true);
       }
 
+      insertSchemaVersion();
+
     } catch (SQLException se) {
-      throw new CatalogException(se);
+      throw new CatalogException("failed to create base tables for Derby catalog store.", se);
     } finally {
-      CatalogUtil.closeQuietly(conn, stmt);
+      CatalogUtil.closeQuietly(stmt);
     }
   }
 
   @Override
   protected void dropBaseTable() throws CatalogException {
-    Connection conn = null;
+    Connection conn;
     Statement stmt = null;
     Map<String, Boolean> droppedTable = new HashMap<String, Boolean>();
 
     try {
       conn = getConnection();
       stmt = conn.createStatement();
-      StringBuilder sql = new StringBuilder();
 
       for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
         if(entry.getValue() && !entry.getKey().equals(TB_TABLES)) {
-          sql.delete(0, sql.length());
-          sql.append("DROP TABLE ").append(entry.getKey());
-          stmt.addBatch(sql.toString());
+          String sql = "DROP TABLE " + entry.getKey();
+          stmt.addBatch(sql);
           droppedTable.put(entry.getKey(), true);
         }
       }
       if(baseTableMaps.get(TB_TABLES)) {
-        sql.delete(0, sql.length());
-        sql.append("DROP TABLE ").append(TB_TABLES);
-        stmt.addBatch(sql.toString());
+        String sql = "DROP TABLE " + TB_TABLES;
+        stmt.addBatch(sql);
         droppedTable.put(TB_TABLES, true);
       }
       stmt.executeBatch();
@@ -366,15 +315,14 @@ public class DerbyStore extends AbstractDBStore {
     } catch (SQLException se) {
       throw new CatalogException(se);
     } finally {
-      CatalogUtil.closeQuietly(conn, stmt);
+      CatalogUtil.closeQuietly(stmt);
     }
   }
 
   @Override
   protected boolean isInitialized() throws CatalogException {
-    Connection conn = null;
+    Connection conn;
     ResultSet res = null;
-    int foundCount = 0;
 
     try {
       conn = getConnection();
@@ -382,6 +330,8 @@ public class DerbyStore extends AbstractDBStore {
           new String [] {"TABLE"});
 
       baseTableMaps.put(TB_META, false);
+      baseTableMaps.put(TB_SPACES, false);
+      baseTableMaps.put(TB_DATABASES, false);
       baseTableMaps.put(TB_TABLES, false);
       baseTableMaps.put(TB_COLUMNS, false);
       baseTableMaps.put(TB_OPTIONS, false);
@@ -396,7 +346,7 @@ public class DerbyStore extends AbstractDBStore {
     } catch (SQLException se){
       throw  new CatalogException(se);
     } finally {
-      CatalogUtil.closeQuietly(conn, res);
+      CatalogUtil.closeQuietly(res);
     }
 
     for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
@@ -408,28 +358,6 @@ public class DerbyStore extends AbstractDBStore {
     return true;
   }
 
-  final boolean checkInternalTable(final String tableName) throws CatalogException {
-    Connection conn = null;
-    ResultSet res = null;
-    boolean found = false;
-
-    try {
-      conn = getConnection();
-      res = conn.getMetaData().getTables(null, null, null,
-          new String [] {"TABLE"});
-      while(res.next() && !found) {
-        if (tableName.equals(res.getString("TABLE_NAME")))
-          found = true;
-      }
-
-    } catch (SQLException se) {
-      throw new CatalogException(se);
-    } finally {
-      CatalogUtil.closeQuietly(conn, res);
-    }
-    return found;
-  }
-
 
   @Override
   public final void close() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
index 1fef286..9fc9d2a 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -25,64 +25,134 @@ import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.FunctionDesc;
-import org.apache.tajo.catalog.exception.CatalogException;
+import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
+/**
+ * CatalogServer guarantees that all operations are thread-safe.
+ * So, we don't need to consider concurrency problem here.
+ */
 public class MemStore implements CatalogStore {
-  private final Map<String,CatalogProtos.TableDescProto> tables = Maps.newHashMap();
+  private final Map<String, String> tablespaces = Maps.newHashMap();
+  private final Map<String, Map<String, CatalogProtos.TableDescProto>> databases = Maps.newHashMap();
   private final Map<String, CatalogProtos.FunctionDescProto> functions = Maps.newHashMap();
-  private final Map<String, IndexDescProto> indexes = Maps.newHashMap();
-  private final Map<String, IndexDescProto> indexesByColumn = Maps.newHashMap();
+  private final Map<String, Map<String, IndexDescProto>> indexes = Maps.newHashMap();
+  private final Map<String, Map<String, IndexDescProto>> indexesByColumn = Maps.newHashMap();
   
   public MemStore(Configuration conf) {
   }
 
-  /* (non-Javadoc)
-   * @see java.io.Closeable#close()
-   */
   @Override
   public void close() throws IOException {
-    tables.clear();
+    databases.clear();
     functions.clear();
     indexes.clear();
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#addTable(TableDesc)
-   */
   @Override
-  public void addTable(CatalogProtos.TableDescProto desc) throws CatalogException {
-    synchronized(tables) {
-      String tableId = desc.getId().toLowerCase();
-      tables.put(tableId, desc);
+  public void createTablespace(String spaceName, String spaceUri) throws CatalogException {
+    if (tablespaces.containsKey(spaceName)) {
+      throw new AlreadyExistsTablespaceException(spaceName);
     }
+
+    tablespaces.put(spaceName, spaceUri);
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#existTable(java.lang.String)
-   */
   @Override
-  public boolean existTable(String name) throws CatalogException {
-    synchronized(tables) {
-      String tableId = name.toLowerCase();
-      return tables.containsKey(tableId);
+  public boolean existTablespace(String spaceName) throws CatalogException {
+    return tablespaces.containsKey(spaceName);
+  }
+
+  @Override
+  public void dropTablespace(String spaceName) throws CatalogException {
+    if (!tablespaces.containsKey(spaceName)) {
+      throw new NoSuchTablespaceException(spaceName);
     }
+    tablespaces.remove(spaceName);
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#deleteTable(java.lang.String)
+  @Override
+  public Collection<String> getAllTablespaceNames() throws CatalogException {
+    return tablespaces.keySet();
+  }
+
+  @Override
+  public void createDatabase(String databaseName, String tablespaceName) throws CatalogException {
+    if (databases.containsKey(databaseName)) {
+      throw new AlreadyExistsDatabaseException(databaseName);
+    }
+
+    databases.put(databaseName, new HashMap<String, CatalogProtos.TableDescProto>());
+  }
+
+  @Override
+  public boolean existDatabase(String databaseName) throws CatalogException {
+    return databases.containsKey(databaseName);
+  }
+
+  @Override
+  public void dropDatabase(String databaseName) throws CatalogException {
+    if (!databases.containsKey(databaseName)) {
+      throw new NoSuchDatabaseException(databaseName);
+    }
+    databases.remove(databaseName);
+  }
+
+  @Override
+  public Collection<String> getAllDatabaseNames() throws CatalogException {
+    return databases.keySet();
+  }
+
+  /**
+   * Get a database namespace from a Map instance.
    */
+  private <T> Map<String, T> checkAndGetDatabaseNS(final Map<String, Map<String, T>> databaseMap,
+                                                   String databaseName) {
+    if (databaseMap.containsKey(databaseName)) {
+      return databaseMap.get(databaseName);
+    } else {
+      throw new NoSuchDatabaseException(databaseName);
+    }
+  }
+
+  @Override
+  public void createTable(CatalogProtos.TableDescProto request) throws CatalogException {
+    String [] splitted = CatalogUtil.splitTableName(CatalogUtil.normalizeIdentifier(request.getTableName()));
+    if (splitted.length == 1) {
+      throw new IllegalArgumentException("createTable() requires a qualified table name, but it is \""
+          + request.getTableName() + "\".");
+    }
+    String databaseName = splitted[0];
+    String tableName = splitted[1];
+
+    Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, databaseName);
+
+    String tbName = CatalogUtil.normalizeIdentifier(tableName);
+    if (database.containsKey(tbName)) {
+      throw new AlreadyExistsTableException(tbName);
+    }
+    database.put(tbName, request);
+  }
+
+  @Override
+  public boolean existTable(String dbName, String tbName) throws CatalogException {
+    Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, dbName);
+
+    return database.containsKey(tbName);
+  }
+
   @Override
-  public void deleteTable(String name) throws CatalogException {
-    synchronized(tables) {
-      String tableId = name.toLowerCase();
-      tables.remove(tableId);
+  public void dropTable(String dbName, String tbName) throws CatalogException {
+    Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, dbName);
+
+    if (database.containsKey(tbName)) {
+      database.remove(tbName);
+    } else {
+      throw new NoSuchTableException(tbName);
     }
   }
 
@@ -90,24 +160,30 @@ public class MemStore implements CatalogStore {
    * @see CatalogStore#getTable(java.lang.String)
    */
   @Override
-  public CatalogProtos.TableDescProto getTable(String name) throws CatalogException {
-    String tableId = name.toLowerCase();
-    CatalogProtos.TableDescProto unqualified = tables.get(tableId);
-    if(unqualified == null)
-      return null;
-    CatalogProtos.TableDescProto.Builder builder = CatalogProtos.TableDescProto.newBuilder();
-    CatalogProtos.SchemaProto schemaProto = CatalogUtil.getQualfiedSchema(tableId, unqualified.getSchema());
-    builder.mergeFrom(unqualified);
-    builder.setSchema(schemaProto);
-    return builder.build();
+  public CatalogProtos.TableDescProto getTable(String databaseName, String tableName)
+      throws CatalogException {
+    Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, databaseName);
+
+    if (database.containsKey(tableName)) {
+      CatalogProtos.TableDescProto unqualified = database.get(tableName);
+      CatalogProtos.TableDescProto.Builder builder = CatalogProtos.TableDescProto.newBuilder();
+      CatalogProtos.SchemaProto schemaProto =
+          CatalogUtil.getQualfiedSchema(databaseName + "." + tableName, unqualified.getSchema());
+      builder.mergeFrom(unqualified);
+      builder.setSchema(schemaProto);
+      return builder.build();
+    } else {
+      throw new NoSuchTableException(tableName);
+    }
   }
 
   /* (non-Javadoc)
    * @see CatalogStore#getAllTableNames()
    */
   @Override
-  public List<String> getAllTableNames() throws CatalogException {
-    return new ArrayList<String>(tables.keySet());
+  public List<String> getAllTableNames(String databaseName) throws CatalogException {
+    Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, databaseName);
+    return new ArrayList<String>(database.keySet());
   }
 
   @Override
@@ -116,21 +192,33 @@ public class MemStore implements CatalogStore {
   }
 
   @Override
-  public CatalogProtos.PartitionMethodProto getPartitionMethod(String tableName) throws CatalogException {
-    String tableId = tableName.toLowerCase();
-    CatalogProtos.TableDescProto table = tables.get(tableId);
-    return (table != null && table.hasPartition()) ? table.getPartition() : null;
+  public CatalogProtos.PartitionMethodProto getPartitionMethod(String databaseName, String tableName)
+      throws CatalogException {
+    Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, databaseName);
+
+    if (database.containsKey(tableName)) {
+      CatalogProtos.TableDescProto table = database.get(tableName);
+      return table.hasPartition() ? table.getPartition() : null;
+    } else {
+      throw new NoSuchTableException(tableName);
+    }
   }
 
   @Override
-  public boolean existPartitionMethod(String tableName) throws CatalogException {
-    String tableId = tableName.toLowerCase();
-    CatalogProtos.TableDescProto table = tables.get(tableId);
-    return (table != null && table.hasPartition());
+  public boolean existPartitionMethod(String databaseName, String tableName)
+      throws CatalogException {
+    Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, databaseName);
+
+    if (database.containsKey(tableName)) {
+      CatalogProtos.TableDescProto table = database.get(tableName);
+      return table.hasPartition();
+    } else {
+      throw new NoSuchTableException(tableName);
+    }
   }
 
   @Override
-  public void delPartitionMethod(String tableName) throws CatalogException {
+  public void dropPartitionMethod(String databaseName, String tableName) throws CatalogException {
     throw new RuntimeException("not supported!");
   }
 
@@ -140,7 +228,8 @@ public class MemStore implements CatalogStore {
   }
 
   @Override
-  public void addPartition(CatalogProtos.PartitionDescProto partitionDesc) throws CatalogException {
+  public void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto
+      partitionDescProto) throws CatalogException {
     throw new RuntimeException("not supported!");
   }
 
@@ -160,105 +249,110 @@ public class MemStore implements CatalogStore {
   }
 
   @Override
-  public void delPartitions(String tableName) throws CatalogException {
+  public void dropPartitions(String tableName) throws CatalogException {
     throw new RuntimeException("not supported!");
   }
 
   /* (non-Javadoc)
-   * @see CatalogStore#addIndex(nta.catalog.proto.CatalogProtos.IndexDescProto)
+   * @see CatalogStore#createIndex(nta.catalog.proto.CatalogProtos.IndexDescProto)
    */
   @Override
-  public void addIndex(IndexDescProto proto) throws CatalogException {
-    synchronized(indexes) {
-      indexes.put(proto.getName(), proto);
-      indexesByColumn.put(proto.getTableId() + "." 
-          + CatalogUtil.extractSimpleName(proto.getColumn().getName()), proto);
+  public void createIndex(IndexDescProto proto) throws CatalogException {
+    final String databaseName = proto.getTableIdentifier().getDatabaseName();
+
+    Map<String, IndexDescProto> index = checkAndGetDatabaseNS(indexes, databaseName);
+    Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName);
+
+    if (index.containsKey(proto.getIndexName())) {
+      throw new AlreadyExistsIndexException(proto.getIndexName());
     }
+
+    index.put(proto.getIndexName(), proto);
+    indexByColumn.put(proto.getTableIdentifier().getTableName() + "."
+        + CatalogUtil.extractSimpleName(proto.getColumn().getName()), proto);
   }
 
   /* (non-Javadoc)
-   * @see CatalogStore#delIndex(java.lang.String)
+   * @see CatalogStore#dropIndex(java.lang.String)
    */
   @Override
-  public void delIndex(String indexName) throws CatalogException {
-    synchronized(indexes) {
-      indexes.remove(indexName);
+  public void dropIndex(String databaseName, String indexName) throws CatalogException {
+    Map<String, IndexDescProto> index = checkAndGetDatabaseNS(indexes, databaseName);
+    if (!index.containsKey(indexName)) {
+      throw new NoSuchIndexException(indexName);
     }
+    index.remove(indexName);
   }
 
   /* (non-Javadoc)
-   * @see CatalogStore#getIndex(java.lang.String)
+   * @see CatalogStore#getIndexByName(java.lang.String)
    */
   @Override
-  public IndexDescProto getIndex(String indexName) throws CatalogException {
-    return indexes.get(indexName);
+  public IndexDescProto getIndexByName(String databaseName, String indexName) throws CatalogException {
+    Map<String, IndexDescProto> index = checkAndGetDatabaseNS(indexes, databaseName);
+    if (!index.containsKey(indexName)) {
+      throw new NoSuchIndexException(indexName);
+    }
+
+    return index.get(indexName);
   }
 
   /* (non-Javadoc)
-   * @see CatalogStore#getIndex(java.lang.String, java.lang.String)
+   * @see CatalogStore#getIndexByName(java.lang.String, java.lang.String)
    */
   @Override
-  public IndexDescProto getIndex(String tableName, String columnName) throws CatalogException {
-    return indexesByColumn.get(tableName+"."+columnName);
+  public IndexDescProto getIndexByColumn(String databaseName, String tableName, String columnName)
+      throws CatalogException {
+
+    Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName);
+    if (!indexByColumn.containsKey(columnName)) {
+      throw new NoSuchIndexException(columnName);
+    }
+
+    return indexByColumn.get(columnName);
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#existIndex(java.lang.String)
-   */
   @Override
-  public boolean existIndex(String indexName) throws CatalogException {
-    return indexes.containsKey(indexName);
+  public boolean existIndexByName(String databaseName, String indexName) throws CatalogException {
+    Map<String, IndexDescProto> index = checkAndGetDatabaseNS(indexes, databaseName);
+    return index.containsKey(indexName);
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#existIndex(java.lang.String, java.lang.String)
-   */
   @Override
-  public boolean existIndex(String tableName, String columnName) throws CatalogException {
-    return indexesByColumn.containsKey(tableName + "." + columnName);
+  public boolean existIndexByColumn(String databaseName, String tableName, String columnName)
+      throws CatalogException {
+    Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName);
+    return indexByColumn.containsKey(columnName);
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#getIndexes(java.lang.String)
-   */
   @Override
-  public IndexDescProto[] getIndexes(String tableName) throws CatalogException {
+  public IndexDescProto[] getIndexes(String databaseName, String tableName) throws CatalogException {
     List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
-    for (IndexDescProto proto : indexesByColumn.values()) {
-      if (proto.getTableId().equals(tableName)) {
+    Map<String, IndexDescProto> indexByColumn = checkAndGetDatabaseNS(indexesByColumn, databaseName);
+    for (IndexDescProto proto : indexByColumn.values()) {
+      if (proto.equals(tableName)) {
         protos.add(proto);
       }
     }
+
     return protos.toArray(new IndexDescProto[protos.size()]);
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#addFunction(FunctionDesc)
-   */
   @Override
   public void addFunction(FunctionDesc func) throws CatalogException {
     // to be implemented
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#deleteFunction(FunctionDesc)
-   */
   @Override
   public void deleteFunction(FunctionDesc func) throws CatalogException {
     // to be implemented
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#existFunction(FunctionDesc)
-   */
   @Override
   public void existFunction(FunctionDesc func) throws CatalogException {
     // to be implemented
   }
 
-  /* (non-Javadoc)
-   * @see CatalogStore#getAllFunctionNames()
-   */
   @Override
   public List<String> getAllFunctionNames() throws CatalogException {
     // to be implemented

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
index bbcdb72..849afc8 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
@@ -31,16 +31,25 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class MySQLStore extends AbstractDBStore  {
+  /** 2014-03-20: First versioning */
+  private static final int MYSQL_CATALOG_STORE_VERSION_2 = 2;
+  /** Before 2013-03-20 */
+  private static final int MYSQL_CATALOG_STORE_VERSION_1 = 1;
 
   private static final String CATALOG_DRIVER = "com.mysql.jdbc.Driver";
   protected String getCatalogDriverName(){
     return CATALOG_DRIVER;
   }
 
-  public MySQLStore(Configuration conf) throws InternalException {
+  public MySQLStore(final Configuration conf) throws InternalException {
     super(conf);
   }
 
+  @Override
+  public int getDriverVersion() {
+    return MYSQL_CATALOG_STORE_VERSION_2;
+  }
+
   protected Connection createConnection(Configuration conf) throws SQLException {
     Connection con = DriverManager.getConnection(getCatalogUri(), this.connectionId,
         this.connectionPassword);
@@ -48,6 +57,11 @@ public class MySQLStore extends AbstractDBStore  {
     return con;
   }
 
+  @Override
+  public String readSchemaFile(String filename) throws CatalogException {
+    return super.readSchemaFile("mysql/" + filename);
+  }
+
   // TODO - DDL and index statements should be renamed
   @Override
   protected void createBaseTable() throws CatalogException {
@@ -58,10 +72,10 @@ public class MySQLStore extends AbstractDBStore  {
       conn = getConnection();
       stmt = conn.createStatement();
 
-      StringBuilder sql = new StringBuilder();
+
       // META
       if (!baseTableMaps.get(TB_META)) {
-        sql.append("CREATE TABLE ").append(TB_META).append(" (version int NOT NULL)");
+        String sql = super.readSchemaFile("common/meta.sql");
 
         if (LOG.isDebugEnabled()) {
           LOG.debug(sql.toString());
@@ -72,40 +86,47 @@ public class MySQLStore extends AbstractDBStore  {
         baseTableMaps.put(TB_META, true);
       }
 
-      // TABLES
-      if (!baseTableMaps.get(TB_TABLES)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_TABLES).append("(");
-        sql.append("TID int NOT NULL AUTO_INCREMENT PRIMARY KEY, ");
-        sql.append(C_TABLE_ID).append(" VARCHAR(255) NOT NULL UNIQUE, ");
-        sql.append("path TEXT, ").append("store_type CHAR(16)").append(")");
+      // TABLE SPACES
+      if (!baseTableMaps.get(TB_SPACES)) {
+        String sql = readSchemaFile("tablespaces.sql");
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
 
-        stmt.executeUpdate(sql.toString());
+        stmt.executeUpdate(sql);
+
+        LOG.info("Table '" + TB_SPACES + "' is created.");
+        baseTableMaps.put(TB_SPACES, true);
+      }
+
+      // DATABASES
+      if (!baseTableMaps.get(TB_DATABASES)) {
+        String sql = readSchemaFile("databases.sql");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        LOG.info("Table '" + TB_DATABASES + "' is created.");
+        baseTableMaps.put(TB_DATABASES, true);
+        stmt.executeUpdate(sql);
+      }
+
+      // TABLES
+      if (!baseTableMaps.get(TB_TABLES)) {
+        String sql = readSchemaFile("tables.sql");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        stmt.executeUpdate(sql);
         LOG.info("Table '" + TB_TABLES + "' is created.");
         baseTableMaps.put(TB_TABLES, true);
       }
 
       // COLUMNS
       if (!baseTableMaps.get(TB_COLUMNS)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_COLUMNS).append("(");
-        sql.append("TID INT NOT NULL, ");
-        sql.append(C_TABLE_ID).append(" VARCHAR(255) NOT NULL,");
-        sql.append("column_id INT NOT NULL,");
-        sql.append("column_name VARCHAR(255) NOT NULL, ");
-        sql.append("data_type CHAR(16), ");
-        sql.append("type_length INTEGER, ");
-        sql.append("UNIQUE KEY(").append(C_TABLE_ID).append(", column_name),");
-        sql.append("FOREIGN KEY(TID) REFERENCES ").append(TB_TABLES).append("(TID) ON DELETE CASCADE,");
-        sql.append("FOREIGN KEY(").append(C_TABLE_ID).append(") REFERENCES ");
-        sql.append(TB_TABLES).append("(").append(C_TABLE_ID).append(") ON DELETE CASCADE)");
-
+        String sql = readSchemaFile("columns.sql");
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
 
         stmt.executeUpdate(sql.toString());
@@ -115,14 +136,7 @@ public class MySQLStore extends AbstractDBStore  {
 
       // OPTIONS
       if (!baseTableMaps.get(TB_OPTIONS)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_OPTIONS).append("(");
-        sql.append(C_TABLE_ID).append( " VARCHAR(255) NOT NULL,");
-        sql.append("key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL,");
-        sql.append("INDEX(").append(C_TABLE_ID).append(", key_),");
-        sql.append("FOREIGN KEY(").append(C_TABLE_ID);
-        sql.append(") REFERENCES ").append(TB_TABLES);
-        sql.append("(").append(C_TABLE_ID).append(") ON DELETE CASCADE)");
+        String sql = readSchemaFile("table_properties.sql");
 
         if (LOG.isDebugEnabled()) {
           LOG.debug(sql.toString());
@@ -135,20 +149,7 @@ public class MySQLStore extends AbstractDBStore  {
 
       // INDEXES
       if (!baseTableMaps.get(TB_INDEXES)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_INDEXES).append("(");
-        sql.append("index_name VARCHAR(255) NOT NULL PRIMARY KEY, ");
-        sql.append(C_TABLE_ID).append(" VARCHAR(255) NOT NULL,");
-        sql.append("column_name VARCHAR(255) NOT NULL, ");
-        sql.append("data_type VARCHAR(255) NOT NULL, ");
-        sql.append("index_type CHAR(32) NOT NULL, ");
-        sql.append("is_unique BOOLEAN NOT NULL, ");
-        sql.append("is_clustered BOOLEAN NOT NULL, ");
-        sql.append("is_ascending BOOLEAN NOT NULL,");
-        sql.append("INDEX(").append(C_TABLE_ID).append(", column_name),");
-        sql.append("FOREIGN KEY(").append(C_TABLE_ID);
-        sql.append(") REFERENCES ").append(TB_TABLES);
-        sql.append("(").append(C_TABLE_ID).append(") ON DELETE CASCADE)");
+        String sql = readSchemaFile("indexes.sql");
 
         if (LOG.isDebugEnabled()) {
           LOG.debug(sql.toString());
@@ -160,15 +161,7 @@ public class MySQLStore extends AbstractDBStore  {
       }
 
       if (!baseTableMaps.get(TB_STATISTICS)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_STATISTICS).append("(");
-        sql.append(C_TABLE_ID).append(" VARCHAR(255) NOT NULL,");
-        sql.append("num_rows BIGINT, ");
-        sql.append("num_bytes BIGINT,");
-        sql.append("INDEX(").append(C_TABLE_ID).append("),");
-        sql.append("FOREIGN KEY(").append(C_TABLE_ID);
-        sql.append(") REFERENCES ").append(TB_TABLES);
-        sql.append("(").append(C_TABLE_ID).append(") ON DELETE CASCADE)");
+        String sql = readSchemaFile("stats.sql");
 
         if (LOG.isDebugEnabled()) {
           LOG.debug(sql.toString());
@@ -181,40 +174,20 @@ public class MySQLStore extends AbstractDBStore  {
 
       // PARTITION_METHODS
       if (!baseTableMaps.get(TB_PARTITION_METHODS)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_PARTITION_METHODS).append("(");
-        sql.append(C_TABLE_ID).append(" VARCHAR(255) PRIMARY KEY,");
-        sql.append("partition_type VARCHAR(10) NOT NULL,");
-        sql.append("expression TEXT NOT NULL,");
-        sql.append("expression_schema VARBINARY(1024) NOT NULL, ");
-        sql.append("FOREIGN KEY(").append(C_TABLE_ID);
-        sql.append(") REFERENCES ").append(TB_TABLES);
-        sql.append("(").append(C_TABLE_ID).append(") ON DELETE CASCADE)");
+        String sql = readSchemaFile("partition_methods.sql");
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql.toString());
+          LOG.debug(sql);
         }
 
-        stmt.executeUpdate(sql.toString());
+        stmt.executeUpdate(sql);
         LOG.info("Table '" + TB_PARTITION_METHODS + "' is created.");
         baseTableMaps.put(TB_PARTITION_METHODS, true);
       }
 
       // PARTITIONS
       if (!baseTableMaps.get(TB_PARTTIONS)) {
-        sql.delete(0, sql.length());
-        sql.append("CREATE TABLE ").append(TB_PARTTIONS).append("(");
-        sql.append("PID int NOT NULL AUTO_INCREMENT PRIMARY KEY, ");
-        sql.append(C_TABLE_ID).append( " VARCHAR(255) NOT NULL,");
-        sql.append("partition_name VARCHAR(255), ");
-        sql.append("ordinal_position INT NOT NULL,");
-        sql.append("partition_value TEXT,");
-        sql.append("path TEXT,");
-        sql.append("cache_nodes VARCHAR(255), ");
-        sql.append("UNIQUE KEY(").append(C_TABLE_ID).append(", partition_name),");
-        sql.append("FOREIGN KEY(").append(C_TABLE_ID);
-        sql.append(") REFERENCES ").append(TB_TABLES);
-        sql.append("(").append(C_TABLE_ID).append(") ON DELETE CASCADE)");
+        String sql = readSchemaFile("partitions.sql");
 
         if (LOG.isDebugEnabled()) {
           LOG.debug(sql.toString());
@@ -224,10 +197,13 @@ public class MySQLStore extends AbstractDBStore  {
         LOG.info("Table '" + TB_PARTTIONS + "' is created.");
         baseTableMaps.put(TB_PARTTIONS, true);
       }
+
+      insertSchemaVersion();
+
     } catch (SQLException se) {
-      throw new CatalogException(se);
+      throw new CatalogException("failed to create base tables for MySQL catalog store", se);
     } finally {
-      CatalogUtil.closeQuietly(conn, stmt);
+      CatalogUtil.closeQuietly(stmt);
     }
   }
 
@@ -264,13 +240,13 @@ public class MySQLStore extends AbstractDBStore  {
     } catch (SQLException se) {
       throw new CatalogException(se);
     } finally {
-      CatalogUtil.closeQuietly(conn, stmt);
+      CatalogUtil.closeQuietly(stmt);
     }
   }
 
   @Override
   protected boolean isInitialized() throws CatalogException {
-    Connection conn = null;
+    Connection conn;
     ResultSet res = null;
 
     try {
@@ -279,6 +255,8 @@ public class MySQLStore extends AbstractDBStore  {
           new String[]{"TABLE"});
 
       baseTableMaps.put(TB_META, false);
+      baseTableMaps.put(TB_SPACES, false);
+      baseTableMaps.put(TB_DATABASES, false);
       baseTableMaps.put(TB_TABLES, false);
       baseTableMaps.put(TB_COLUMNS, false);
       baseTableMaps.put(TB_OPTIONS, false);
@@ -305,10 +283,9 @@ public class MySQLStore extends AbstractDBStore  {
     } catch(SQLException se) {
       throw new CatalogException(se);
     } finally {
-      CatalogUtil.closeQuietly(conn, res);
+      CatalogUtil.closeQuietly(res);
     }
 
     return  true;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/common/meta.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/common/meta.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/common/meta.sql
new file mode 100644
index 0000000..066c5f8
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/common/meta.sql
@@ -0,0 +1 @@
+CREATE TABLE META (VERSION INT NOT NULL)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/columns.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/columns.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/columns.sql
new file mode 100644
index 0000000..e274f3c
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/columns.sql
@@ -0,0 +1,8 @@
+CREATE TABLE COLUMNS (
+  TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE,
+  COLUMN_NAME VARCHAR(128) NOT NULL,
+  ORDINAL_POSITION INTEGER NOT NULL,
+  DATA_TYPE CHAR(16),
+  TYPE_LENGTH INTEGER,
+  CONSTRAINT COLUMNS_PK PRIMARY KEY (TID, COLUMN_NAME)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/databases.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/databases.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/databases.sql
new file mode 100644
index 0000000..8cabb14
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/databases.sql
@@ -0,0 +1,6 @@
+CREATE TABLE DATABASES_ (
+  DB_ID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
+  DB_NAME VARCHAR(128) NOT NULL CONSTRAINT DB_NAME_UNIQ UNIQUE,
+  SPACE_ID INT NOT NULL REFERENCES TABLESPACES (SPACE_ID),
+  CONSTRAINT DATABASES_PK PRIMARY KEY (DB_ID)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/databases_idx.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/databases_idx.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/databases_idx.sql
new file mode 100644
index 0000000..c9c7d4e
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/databases_idx.sql
@@ -0,0 +1 @@
+CREATE UNIQUE INDEX idx_database_db_id on DATABASES_ (DB_ID)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/indexes.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/indexes.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/indexes.sql
new file mode 100644
index 0000000..c4cfc25
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/indexes.sql
@@ -0,0 +1,12 @@
+CREATE TABLE INDEXES (
+  DB_ID INT NOT NULL REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE,
+  TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE,
+  INDEX_NAME VARCHAR(128) NOT NULL,
+  COLUMN_NAME VARCHAR(128) NOT NULL,
+  DATA_TYPE VARCHAR(128) NOT NULL,
+  INDEX_TYPE CHAR(32) NOT NULL,
+  IS_UNIQUE BOOLEAN NOT NULL,
+  IS_CLUSTERED BOOLEAN NOT NULL,
+  IS_ASCENDING BOOLEAN NOT NULL,
+  CONSTRAINT C_INDEXES_PK PRIMARY KEY (DB_ID, INDEX_NAME)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/partition_methods.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/partition_methods.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/partition_methods.sql
new file mode 100644
index 0000000..4ad4c60
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/partition_methods.sql
@@ -0,0 +1,6 @@
+CREATE TABLE PARTITION_METHODS (
+  TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE,
+  PARTITION_TYPE VARCHAR(10) NOT NULL,
+  EXPRESSION VARCHAR(1024) NOT NULL,
+  EXPRESSION_SCHEMA VARCHAR(1024) FOR BIT DATA NOT NULL
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/partitions.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/partitions.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/partitions.sql
new file mode 100644
index 0000000..24ee422
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/partitions.sql
@@ -0,0 +1,10 @@
+CREATE TABLE PARTITIONS (
+  PID INT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
+  TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE,
+  PARTITION_NAME VARCHAR(255),
+  ORDINAL_POSITION INT NOT NULL,
+  PARTITION_VALUE VARCHAR(1024),
+  PATH VARCHAR(1024),
+  CONSTRAINT C_PARTITION_PK PRIMARY KEY (PID),
+  CONSTRAINT C_PARTITION_UNIQUE UNIQUE (TID, PARTITION_NAME)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/stats.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/stats.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/stats.sql
new file mode 100644
index 0000000..bba8ee7
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/stats.sql
@@ -0,0 +1,6 @@
+CREATE TABLE STATS (
+  TID INT NOT NULL PRIMARY KEY,
+  NUM_ROWS BIGINT,
+  NUM_BYTES BIGINT,
+  FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/table_properties.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/table_properties.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/table_properties.sql
new file mode 100644
index 0000000..2b2d89a
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/table_properties.sql
@@ -0,0 +1,6 @@
+CREATE TABLE OPTIONS (
+  TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE,
+  KEY_ VARCHAR(255) NOT NULL,
+  VALUE_ VARCHAR(255) NOT NULL,
+  CONSTRAINT C_OPTIONS_UNIQUE UNIQUE (TID, KEY_, VALUE_)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/tables.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/tables.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/tables.sql
new file mode 100644
index 0000000..2fe0e7d
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/tables.sql
@@ -0,0 +1,10 @@
+CREATE TABLE TABLES (
+  TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
+  DB_ID int NOT NULL REFERENCES DATABASES_ (DB_ID),
+  TABLE_NAME VARCHAR(128) NOT NULL,
+  TABLE_TYPE VARCHAR(128) NOT NULL,
+  PATH VARCHAR(4096),
+  STORE_TYPE CHAR(16),
+  CONSTRAINT TABLES_PK PRIMARY KEY (TID),
+  CONSTRAINT C_TABLE_ID_UNIQ UNIQUE (TABLE_NAME)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/tablespaces.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/tablespaces.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/tablespaces.sql
new file mode 100644
index 0000000..c2af3ae
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/tablespaces.sql
@@ -0,0 +1,7 @@
+CREATE TABLE TABLESPACES (
+  SPACE_ID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
+  SPACE_NAME VARCHAR(128) NOT NULL CONSTRAINT SPACE_UNIQUE UNIQUE,
+  SPACE_HANDLER VARCHAR (1024) DEFAULT 'HDFS',
+  SPACE_URI VARCHAR (4096) NOT NULL,
+  CONSTRAINT C_SPACE_PK PRIMARY KEY (SPACE_ID)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/columns.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/columns.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/columns.sql
new file mode 100644
index 0000000..51e2a1b
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/columns.sql
@@ -0,0 +1,9 @@
+CREATE TABLE COLUMNS (
+  TID INT NOT NULL,
+  COLUMN_NAME VARCHAR(255) NOT NULL,
+  ORDINAL_POSITION INT NOT NULL,
+  DATA_TYPE CHAR(16),
+  TYPE_LENGTH INTEGER,
+  PRIMARY KEY (TID, COLUMN_NAME),
+  FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/databases.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/databases.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/databases.sql
new file mode 100644
index 0000000..e07e916
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/databases.sql
@@ -0,0 +1,7 @@
+CREATE TABLE DATABASES_ (
+  DB_ID INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
+  DB_NAME VARCHAR(128) NOT NULL UNIQUE,
+  SPACE_ID INT NOT NULL,
+  FOREIGN KEY (SPACE_ID) REFERENCES TABLESPACES (SPACE_ID),
+  UNIQUE INDEX IDX_NAME (DB_NAME)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql
new file mode 100644
index 0000000..62feb36
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/indexes.sql
@@ -0,0 +1,16 @@
+CREATE TABLE INDEXES (
+  DB_ID INT NOT NULL,
+  TID INT NOT NULL,
+  INDEX_NAME VARCHAR(128) NOT NULL,
+  COLUMN_NAME VARCHAR(128) NOT NULL,
+  DATA_TYPE VARCHAR(128) NOT NULL,
+  INDEX_TYPE CHAR(32) NOT NULL,
+  IS_UNIQUE BOOLEAN NOT NULL,
+  IS_CLUSTERED BOOLEAN NOT NULL,
+  IS_ASCENDING BOOLEAN NOT NULL,
+  PRIMARY KEY (DB_ID, INDEX_NAME),
+  FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID) ON DELETE CASCADE,
+  FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE,
+  UNIQUE INDEX IDX_DB_ID_NAME (DB_ID, INDEX_NAME),
+  INDEX IDX_TID_COLUMN_NAME (TID, COLUMN_NAME)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_methods.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_methods.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_methods.sql
new file mode 100644
index 0000000..060c4c8
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_methods.sql
@@ -0,0 +1,7 @@
+CREATE TABLE PARTITION_METHODS (
+  TID INT NOT NULL PRIMARY KEY,
+  PARTITION_TYPE VARCHAR(10) NOT NULL,
+  EXPRESSION VARCHAR(1024) NOT NULL,
+  EXPRESSION_SCHEMA VARCHAR(1024) NOT NULL,
+  FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql
new file mode 100644
index 0000000..428f5a4
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql
@@ -0,0 +1,12 @@
+CREATE TABLE PARTITIONS (
+  PID INT NOT NULL PRIMARY KEY,
+  TID INT NOT NULL,
+  PARTITION_NAME VARCHAR(128),
+  ORDINAL_POSITION INT NOT NULL,
+  PARTITION_VALUE VARCHAR(1024),
+  PATH VARCHAR(4096),
+  FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE,
+  CONSTRAINT C_PARTITION_UNIQUE UNIQUE (TID, PARTITION_NAME),
+  INDEX IDX_TID (TID),
+  UNIQUE INDEX IDX_TID_NAME (TID, PARTITION_NAME)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/stats.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/stats.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/stats.sql
new file mode 100644
index 0000000..bba8ee7
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/stats.sql
@@ -0,0 +1,6 @@
+CREATE TABLE STATS (
+  TID INT NOT NULL PRIMARY KEY,
+  NUM_ROWS BIGINT,
+  NUM_BYTES BIGINT,
+  FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/table_properties.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/table_properties.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/table_properties.sql
new file mode 100644
index 0000000..78e281e
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/table_properties.sql
@@ -0,0 +1,7 @@
+CREATE TABLE OPTIONS (
+  TID INT NOT NULL,
+  KEY_ VARCHAR(255) NOT NULL,
+  VALUE_ VARCHAR(255) NOT NULL,
+  PRIMARY KEY (TID, KEY_),
+  FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/tables.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/tables.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/tables.sql
new file mode 100644
index 0000000..98c0c94
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/tables.sql
@@ -0,0 +1,11 @@
+CREATE TABLE TABLES (
+  TID INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
+  DB_ID INT NOT NULL,
+  TABLE_NAME VARCHAR(128) NOT NULL UNIQUE,
+  TABLE_TYPE VARCHAR(128) NOT NULL,
+  PATH VARCHAR(4096),
+  STORE_TYPE CHAR(16),
+  FOREIGN KEY (DB_ID) REFERENCES DATABASES_ (DB_ID),
+  INDEX IDX_DB_ID (DB_ID),
+  UNIQUE INDEX IDX_TABLE_ID (TABLE_NAME)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/tablespaces.sql
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/tablespaces.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/tablespaces.sql
new file mode 100644
index 0000000..f2e2299
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/tablespaces.sql
@@ -0,0 +1,7 @@
+CREATE TABLE TABLESPACES (
+  SPACE_ID INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
+  SPACE_NAME VARCHAR(128) NOT NULL UNIQUE,
+  SPACE_HANDLER VARCHAR (1024) DEFAULT 'HDFS',
+  SPACE_URI VARCHAR (4096) NOT NULL,
+  UNIQUE INDEX IDX_NAME (SPACE_NAME)
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index 08905dd..cfae15d 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -18,7 +18,10 @@
 
 package org.apache.tajo.catalog;
 
+import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.exception.CatalogException;
 import org.apache.tajo.catalog.exception.NoSuchFunctionException;
 import org.apache.tajo.catalog.function.Function;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
@@ -26,16 +29,22 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.store.DerbyStore;
+import org.apache.tajo.catalog.store.MySQLStore;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.*;
 
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
+import static org.apache.tajo.catalog.CatalogConstants.CATALOG_URI;
 import static org.junit.Assert.*;
 
 public class TestCatalog {
@@ -50,18 +59,53 @@ public class TestCatalog {
 
 	@BeforeClass
 	public static void setUp() throws Exception {
-    TajoConf conf = new TajoConf();
+    final String HCATALOG_CLASS_NAME = "org.apache.tajo.catalog.store.HCatalogStore";
+
+    String driverClass = System.getProperty(CatalogConstants.STORE_CLASS);
+
+    // here, we don't choose HCatalogStore due to some dependency problems.
+    if (driverClass == null || driverClass.equals(HCATALOG_CLASS_NAME)) {
+      driverClass = DerbyStore.class.getCanonicalName();
+    }
+    String catalogURI = System.getProperty(CatalogConstants.CATALOG_URI);
+    if (catalogURI == null) {
+      Path path = CommonTestingUtil.getTestDir();
+      catalogURI = String.format("jdbc:derby:%s/db;create=true", path.toUri().getPath());
+    }
+    String connectionId = System.getProperty(CatalogConstants.CONNECTION_ID);
+    String password = System.getProperty(CatalogConstants.CONNECTION_PASSWORD);
 
-    conf.set(CatalogConstants.CATALOG_URI, "jdbc:derby:target/test-data/TestCatalog/db;create=true");
+    TajoConf conf = new TajoConf();
+    conf.set(CatalogConstants.STORE_CLASS, driverClass);
+    conf.set(CATALOG_URI, catalogURI);
     conf.setVar(TajoConf.ConfVars.CATALOG_ADDRESS, "127.0.0.1:0");
 
+    // MySQLStore requires password
+    if (driverClass.equals(MySQLStore.class.getCanonicalName())) {
+      if (connectionId == null) {
+        throw new CatalogException(String.format("%s driver requires %s", driverClass, CatalogConstants.CONNECTION_ID));
+      }
+      conf.set(CatalogConstants.CONNECTION_ID, connectionId);
+      if (password != null) {
+        conf.set(CatalogConstants.CONNECTION_PASSWORD, password);
+      }
+    }
+
+    Path defaultTableSpace = CommonTestingUtil.getTestDir();
+
 	  server = new CatalogServer();
     server.init(conf);
     server.start();
     catalog = new LocalCatalogWrapper(server);
+    if (!catalog.existTablespace(TajoConstants.DEFAULT_TABLESPACE_NAME)) {
+      catalog.createTablespace(TajoConstants.DEFAULT_TABLESPACE_NAME, defaultTableSpace.toUri().toString());
+    }
+    if (!catalog.existDatabase(DEFAULT_DATABASE_NAME)) {
+      catalog.createDatabase(DEFAULT_DATABASE_NAME, TajoConstants.DEFAULT_TABLESPACE_NAME);
+    }
 
-    for(String table : catalog.getAllTableNames()){
-      catalog.deleteTable(table);
+    for(String table : catalog.getAllTableNames(DEFAULT_DATABASE_NAME)) {
+      catalog.dropTable(table);
     }
 	}
 	
@@ -69,6 +113,161 @@ public class TestCatalog {
 	public static void tearDown() throws IOException {
 	  server.stop();
 	}
+
+  @Test
+  public void testCreateAndDropDatabases() throws Exception {
+    assertFalse(catalog.existDatabase("testCreateAndDropDatabases"));
+    assertTrue(catalog.createDatabase("testCreateAndDropDatabases", TajoConstants.DEFAULT_TABLESPACE_NAME));
+    assertTrue(catalog.existDatabase("testCreateAndDropDatabases"));
+    assertTrue(catalog.dropDatabase("testCreateAndDropDatabases"));
+  }
+
+  @Test
+  public void testCreateAndDropManyDatabases() throws Exception {
+    List<String> createdDatabases = new ArrayList<String>();
+    String namePrefix = "database_";
+    final int NUM = 10;
+    for (int i = 0; i < NUM; i++) {
+      String databaseName = namePrefix + i;
+      assertFalse(catalog.existDatabase(databaseName));
+      assertTrue(catalog.createDatabase(databaseName, TajoConstants.DEFAULT_TABLESPACE_NAME));
+      assertTrue(catalog.existDatabase(databaseName));
+      createdDatabases.add(databaseName);
+    }
+
+    Collection<String> allDatabaseNames = catalog.getAllDatabaseNames();
+    for (String databaseName : allDatabaseNames) {
+      assertTrue(databaseName.equals(DEFAULT_DATABASE_NAME) || createdDatabases.contains(databaseName));
+    }
+    // additional one is 'default' database.
+    assertEquals(NUM + 1, allDatabaseNames.size());
+
+    Collections.shuffle(createdDatabases);
+    for (String tobeDropped : createdDatabases) {
+      assertTrue(catalog.existDatabase(tobeDropped));
+      assertTrue(catalog.dropDatabase(tobeDropped));
+      assertFalse(catalog.existDatabase(tobeDropped));
+    }
+  }
+
+  private TableDesc createMockupTable(String databaseName, String tableName) throws IOException {
+    schema1 = new Schema();
+    schema1.addColumn(FieldName1, Type.BLOB);
+    schema1.addColumn(FieldName2, Type.INT4);
+    schema1.addColumn(FieldName3, Type.INT8);
+    Path path = new Path(CommonTestingUtil.getTestDir(), tableName);
+    TableDesc table = new TableDesc(
+        CatalogUtil.buildFQName(databaseName, tableName),
+        schema1,
+        new TableMeta(StoreType.CSV, new Options()),
+        path, true);
+    return table;
+  }
+
+  @Test
+  public void testCreateAndDropTable() throws Exception {
+    assertTrue(catalog.createDatabase("tmpdb1", TajoConstants.DEFAULT_TABLESPACE_NAME));
+    assertTrue(catalog.existDatabase("tmpdb1"));
+    assertTrue(catalog.createDatabase("tmpdb2", TajoConstants.DEFAULT_TABLESPACE_NAME));
+    assertTrue(catalog.existDatabase("tmpdb2"));
+
+    TableDesc table1 = createMockupTable("tmpdb1", "table1");
+    assertTrue(catalog.createTable(table1));
+
+    TableDesc table2 = createMockupTable("tmpdb2", "table2");
+    assertTrue(catalog.createTable(table2));
+
+    Set<String> tmpdb1 = Sets.newHashSet(catalog.getAllTableNames("tmpdb1"));
+    assertEquals(1, tmpdb1.size());
+    assertTrue(tmpdb1.contains("table1"));
+
+
+    Set<String> tmpdb2 = Sets.newHashSet(catalog.getAllTableNames("tmpdb2"));
+    assertEquals(1, tmpdb2.size());
+    assertTrue(tmpdb2.contains("table2"));
+
+    assertTrue(catalog.dropDatabase("tmpdb1"));
+    assertFalse(catalog.existDatabase("tmpdb1"));
+
+    tmpdb2 = Sets.newHashSet(catalog.getAllTableNames("tmpdb2"));
+    assertEquals(1, tmpdb2.size());
+    assertTrue(tmpdb2.contains("table2"));
+
+    assertTrue(catalog.dropDatabase("tmpdb2"));
+    assertFalse(catalog.existDatabase("tmpdb2"));
+  }
+
+  static String dbPrefix = "db_";
+  static String tablePrefix = "tb_";
+  static final int DB_NUM = 5;
+  static final int TABLE_NUM_PER_DB = 3;
+  static final int TOTAL_TABLE_NUM = DB_NUM * TABLE_NUM_PER_DB;
+
+  private Map<String, List<String>> createBaseDatabaseAndTables() throws IOException {
+
+    Map<String, List<String>> createdDatabaseAndTablesMap = new HashMap<String, List<String>>();
+
+    // add and divide all tables to multiple databases in a round robin manner
+    for (int tableId = 0; tableId < TOTAL_TABLE_NUM; tableId++) {
+      int dbIdx = tableId % DB_NUM;
+      String databaseName = dbPrefix + dbIdx;
+
+      if (!catalog.existDatabase(databaseName)) {
+        assertTrue(catalog.createDatabase(databaseName, TajoConstants.DEFAULT_TABLESPACE_NAME));
+      }
+
+      String tableName = tablePrefix + tableId;
+      TableDesc table = createMockupTable(databaseName, tableName);
+      assertTrue(catalog.createTable(table));
+
+      TUtil.putToNestedList(createdDatabaseAndTablesMap, databaseName, tableName);
+    }
+
+    // checking all tables for each database
+    for (int dbIdx = 0; dbIdx < DB_NUM; dbIdx++) {
+      String databaseName = dbPrefix + dbIdx;
+
+      Collection<String> tableNames = catalog.getAllTableNames(databaseName);
+      assertTrue(createdDatabaseAndTablesMap.containsKey(databaseName));
+
+      assertEquals(createdDatabaseAndTablesMap.get(databaseName).size(), tableNames.size());
+      for (String tableName : tableNames) {
+        assertTrue(createdDatabaseAndTablesMap.get(databaseName).contains(tableName));
+      }
+    }
+
+    return createdDatabaseAndTablesMap;
+  }
+
+  @Test
+  public void testDropDatabaseWithAllTables() throws Exception {
+    Map<String, List<String>> createdTablesMap = createBaseDatabaseAndTables();
+
+    // Each time we drop one database, check all databases and their tables.
+    Iterator<String> it = new ArrayList<String>(createdTablesMap.keySet()).iterator();
+    while(it.hasNext()) {
+      // drop one database
+      String databaseName = it.next();
+      assertTrue(catalog.existDatabase(databaseName));
+      catalog.dropDatabase(databaseName);
+      createdTablesMap.remove(databaseName);
+
+      // check all tables which belong to other databases
+      for (Map.Entry<String, List<String>> entry : createdTablesMap.entrySet()) {
+        assertTrue(catalog.existDatabase(entry.getKey()));
+
+        // checking all tables for this database
+        Collection<String> tablesForThisDatabase = catalog.getAllTableNames(entry.getKey());
+        assertEquals(createdTablesMap.get(entry.getKey()).size(), tablesForThisDatabase.size());
+        for (String tableName : tablesForThisDatabase) {
+          assertTrue(createdTablesMap.get(entry.getKey()).contains(CatalogUtil.extractSimpleName(tableName)));
+        }
+      }
+    }
+
+    // Finally, only default database will remain. So, its result is 1.
+    assertEquals(1, catalog.getAllDatabaseNames().size());
+  }
 	
 	@Test
 	public void testGetTable() throws Exception {
@@ -77,76 +276,78 @@ public class TestCatalog {
 		schema1.addColumn(FieldName2, Type.INT4);
 		schema1.addColumn(FieldName3, Type.INT8);
     Path path = new Path(CommonTestingUtil.getTestDir(), "table1");
-    TableDesc meta = CatalogUtil.newTableDesc(
-        "getTable",
+    TableDesc meta = new TableDesc(
+        CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "getTable"),
         schema1,
         StoreType.CSV,
         new Options(),
         path);
 
-		assertFalse(catalog.existsTable("getTable"));
-    catalog.addTable(meta);
-    assertTrue(catalog.existsTable("getTable"));
+		assertFalse(catalog.existsTable(DEFAULT_DATABASE_NAME, "getTable"));
+    catalog.createTable(meta);
+    assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, "getTable"));
 
-    catalog.deleteTable("getTable");
-    assertFalse(catalog.existsTable("getTable"));
+    catalog.dropTable(CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "getTable"));
+    assertFalse(catalog.existsTable(DEFAULT_DATABASE_NAME, "getTable"));
 	}
 
-	@Test
-	public void testAddTableNoName() throws Exception {
-	  schema1 = new Schema();
-    schema1.addColumn(FieldName1, Type.BLOB);
-    schema1.addColumn(FieldName2, Type.INT4);
-    schema1.addColumn(FieldName3, Type.INT8);
-    
-	  TableMeta info = CatalogUtil.newTableMeta(StoreType.CSV);
-	  TableDesc desc = new TableDesc();
-	  desc.setMeta(info);
-
-    assertFalse(catalog.addTable(desc));
-  }
-
   static IndexDesc desc1;
   static IndexDesc desc2;
   static IndexDesc desc3;
 
   static {
     desc1 = new IndexDesc(
-        "idx_test", "indexed", new Column("id", Type.INT4),
+        "idx_test", DEFAULT_DATABASE_NAME, "indexed", new Column("id", Type.INT4),
         IndexMethod.TWO_LEVEL_BIN_TREE, true, true, true);
 
     desc2 = new IndexDesc(
-        "idx_test2", "indexed", new Column("score", Type.FLOAT8),
+        "idx_test2", DEFAULT_DATABASE_NAME, "indexed", new Column("score", Type.FLOAT8),
         IndexMethod.TWO_LEVEL_BIN_TREE, false, false, false);
 
     desc3 = new IndexDesc(
-        "idx_test", "indexed", new Column("id", Type.INT4),
+        "idx_test", DEFAULT_DATABASE_NAME, "indexed", new Column("id", Type.INT4),
         IndexMethod.TWO_LEVEL_BIN_TREE, true, true, true);
   }
+
+  public static TableDesc prepareTable() throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("indexed.id", Type.INT4)
+        .addColumn("indexed.name", Type.TEXT)
+        .addColumn("indexed.age", Type.INT4)
+        .addColumn("indexed.score", Type.FLOAT8);
+
+    String tableName = "indexed";
+
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+    return new TableDesc(
+        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, tableName), schema, meta,
+        new Path(CommonTestingUtil.getTestDir(), "indexed"));
+  }
 	
 	@Test
 	public void testAddAndDelIndex() throws Exception {
-	  TableDesc desc = TestDBStore.prepareTable();
-	  assertTrue(catalog.addTable(desc));
+	  TableDesc desc = prepareTable();
+	  assertTrue(catalog.createTable(desc));
 	  
-	  assertFalse(catalog.existIndex(desc1.getName()));
-	  assertFalse(catalog.existIndex("indexed", "id"));
-	  catalog.addIndex(desc1);
-	  assertTrue(catalog.existIndex(desc1.getName()));
-	  assertTrue(catalog.existIndex("indexed", "id"));
+	  assertFalse(catalog.existIndexByName("db1", desc1.getIndexName()));
+	  assertFalse(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, "indexed", "id"));
+	  catalog.createIndex(desc1);
+	  assertTrue(catalog.existIndexByName(DEFAULT_DATABASE_NAME, desc1.getIndexName()));
+	  assertTrue(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, "indexed", "id"));
+
+
+	  assertFalse(catalog.existIndexByName(DEFAULT_DATABASE_NAME, desc2.getIndexName()));
+	  assertFalse(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, "indexed", "score"));
+	  catalog.createIndex(desc2);
+	  assertTrue(catalog.existIndexByName(DEFAULT_DATABASE_NAME, desc2.getIndexName()));
+	  assertTrue(catalog.existIndexByColumn(DEFAULT_DATABASE_NAME, "indexed", "score"));
 	  
-	  assertFalse(catalog.existIndex(desc2.getName()));
-	  assertFalse(catalog.existIndex("indexed", "score"));
-	  catalog.addIndex(desc2);
-	  assertTrue(catalog.existIndex(desc2.getName()));
-	  assertTrue(catalog.existIndex("indexed", "score"));
+	  catalog.dropIndex(DEFAULT_DATABASE_NAME, desc1.getIndexName());
+	  assertFalse(catalog.existIndexByName(DEFAULT_DATABASE_NAME, desc1.getIndexName()));
+	  catalog.dropIndex(DEFAULT_DATABASE_NAME, desc2.getIndexName());
+	  assertFalse(catalog.existIndexByName(DEFAULT_DATABASE_NAME, desc2.getIndexName()));
 	  
-	  catalog.deleteIndex(desc1.getName());
-	  assertFalse(catalog.existIndex(desc1.getName()));
-	  catalog.deleteIndex(desc2.getName());
-	  assertFalse(catalog.existIndex(desc2.getName()));
-	  
-	  catalog.deleteTable(desc.getName());
+	  catalog.dropTable(desc.getName());
     assertFalse(catalog.existsTable(desc.getName()));
   }
 	
@@ -253,24 +454,26 @@ public class TestCatalog {
         .addColumn("age", Type.INT4)
         .addColumn("score", Type.FLOAT8);
 
-    String tableName = "addedtable";
+    String tableName = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "addedtable");
     Options opts = new Options();
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
-    partitionDesc.setTableId(tableName);
-    partitionDesc.setExpression("id");
+
     Schema partSchema = new Schema();
     partSchema.addColumn("id", Type.INT4);
-    partitionDesc.setExpressionSchema(partSchema);
-    partitionDesc.setPartitionType(CatalogProtos.PartitionType.HASH);
 
-    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    PartitionMethodDesc partitionDesc =
+        new PartitionMethodDesc(DEFAULT_DATABASE_NAME, tableName,
+            CatalogProtos.PartitionType.HASH, "id", partSchema);
+
+    TableDesc desc =
+        new TableDesc(tableName, schema, meta,
+            new Path(CommonTestingUtil.getTestDir(), "addedtable"));
     desc.setPartitionMethod(partitionDesc);
 
     assertFalse(catalog.existsTable(tableName));
-    catalog.addTable(desc);
+    catalog.createTable(desc);
     assertTrue(catalog.existsTable(tableName));
     TableDesc retrieved = catalog.getTableDesc(tableName);
 
@@ -278,7 +481,7 @@ public class TestCatalog {
     assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.HASH);
     assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(), "id");
 
-    catalog.deleteTable(tableName);
+    catalog.dropTable(tableName);
     assertFalse(catalog.existsTable(tableName));
   }
 
@@ -291,24 +494,24 @@ public class TestCatalog {
         .addColumn("age", Type.INT4)
         .addColumn("score", Type.FLOAT8);
 
-    String tableName = "addedtable";
+    String tableName = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "addedtable");
     Options opts = new Options();
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
-    partitionDesc.setTableId(tableName);
-    partitionDesc.setExpression("id");
     Schema partSchema = new Schema();
     partSchema.addColumn("id", Type.INT4);
-    partitionDesc.setExpressionSchema(partSchema);
-    partitionDesc.setPartitionType(CatalogProtos.PartitionType.HASH);
+    PartitionMethodDesc partitionDesc =
+        new PartitionMethodDesc(DEFAULT_DATABASE_NAME, tableName,
+            CatalogProtos.PartitionType.HASH, "id", partSchema);
 
-    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    TableDesc desc =
+        new TableDesc(tableName, schema, meta,
+            new Path(CommonTestingUtil.getTestDir(), "addedtable"));
     desc.setPartitionMethod(partitionDesc);
 
     assertFalse(catalog.existsTable(tableName));
-    catalog.addTable(desc);
+    catalog.createTable(desc);
     assertTrue(catalog.existsTable(tableName));
 
     TableDesc retrieved = catalog.getTableDesc(tableName);
@@ -317,7 +520,7 @@ public class TestCatalog {
     assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.HASH);
     assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(), "id");
 
-    catalog.deleteTable(tableName);
+    catalog.dropTable(tableName);
     assertFalse(catalog.existsTable(tableName));
   }
 
@@ -329,23 +532,23 @@ public class TestCatalog {
         .addColumn("age", Type.INT4)
         .addColumn("score", Type.FLOAT8);
 
-    String tableName = "addedtable";
+    String tableName = CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "addedtable");
     Options opts = new Options();
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
-    partitionDesc.setTableId(tableName);
-    partitionDesc.setExpression("id");
     Schema partSchema = new Schema();
     partSchema.addColumn("id", Type.INT4);
-    partitionDesc.setExpressionSchema(partSchema);
-    partitionDesc.setPartitionType(CatalogProtos.PartitionType.LIST);
+    PartitionMethodDesc partitionDesc =
+        new PartitionMethodDesc(DEFAULT_DATABASE_NAME, tableName,
+            CatalogProtos.PartitionType.LIST, "id", partSchema);
 
-    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    TableDesc desc =
+        new TableDesc(tableName, schema, meta,
+            new Path(CommonTestingUtil.getTestDir(), "addedtable"));
     desc.setPartitionMethod(partitionDesc);
     assertFalse(catalog.existsTable(tableName));
-    catalog.addTable(desc);
+    catalog.createTable(desc);
     assertTrue(catalog.existsTable(tableName));
 
     TableDesc retrieved = catalog.getTableDesc(tableName);
@@ -354,7 +557,7 @@ public class TestCatalog {
     assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.LIST);
     assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(), "id");
 
-    catalog.deleteTable(tableName);
+    catalog.dropTable(tableName);
     assertFalse(catalog.existsTable(tableName));
   }
 
@@ -366,24 +569,23 @@ public class TestCatalog {
         .addColumn("age", Type.INT4)
         .addColumn("score", Type.FLOAT8);
 
-    String tableName = "addedtable";
+    String tableName = CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "addedtable");
     Options opts = new Options();
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-
-    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
-    partitionDesc.setTableId(tableName);
-    partitionDesc.setExpression("id");
     Schema partSchema = new Schema();
     partSchema.addColumn("id", Type.INT4);
-    partitionDesc.setExpressionSchema(partSchema);
-    partitionDesc.setPartitionType(CatalogProtos.PartitionType.RANGE);
+    PartitionMethodDesc partitionDesc =
+        new PartitionMethodDesc(DEFAULT_DATABASE_NAME, tableName, CatalogProtos.PartitionType.RANGE,
+            "id", partSchema);
 
-    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    TableDesc desc =
+        new TableDesc(tableName, schema, meta,
+            new Path(CommonTestingUtil.getTestDir(), "addedtable"));
     desc.setPartitionMethod(partitionDesc);
     assertFalse(catalog.existsTable(tableName));
-    catalog.addTable(desc);
+    catalog.createTable(desc);
     assertTrue(catalog.existsTable(tableName));
 
     TableDesc retrieved = catalog.getTableDesc(tableName);
@@ -392,7 +594,7 @@ public class TestCatalog {
     assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.RANGE);
     assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(), "id");
 
-    catalog.deleteTable(tableName);
+    catalog.dropTable(tableName);
     assertFalse(catalog.existsTable(tableName));
   }
 
@@ -404,23 +606,24 @@ public class TestCatalog {
         .addColumn("age", Type.INT4)
         .addColumn("score", Type.FLOAT8);
 
-    String tableName = "addedtable";
+    String tableName = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "addedtable");
     Options opts = new Options();
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
-    partitionDesc.setTableId(tableName);
-    partitionDesc.setExpression("id");
     Schema partSchema = new Schema();
     partSchema.addColumn("id", Type.INT4);
-    partitionDesc.setExpressionSchema(partSchema);
-    partitionDesc.setPartitionType(CatalogProtos.PartitionType.COLUMN);
 
-    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    PartitionMethodDesc partitionDesc =
+        new PartitionMethodDesc(DEFAULT_DATABASE_NAME, tableName,
+            CatalogProtos.PartitionType.COLUMN, "id", partSchema);
+
+    TableDesc desc =
+        new TableDesc(tableName, schema, meta,
+            new Path(CommonTestingUtil.getTestDir(), "addedtable"));
     desc.setPartitionMethod(partitionDesc);
     assertFalse(catalog.existsTable(tableName));
-    catalog.addTable(desc);
+    catalog.createTable(desc);
     assertTrue(catalog.existsTable(tableName));
 
     TableDesc retrieved = catalog.getTableDesc(tableName);
@@ -429,8 +632,7 @@ public class TestCatalog {
     assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.COLUMN);
     assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(), "id");
 
-    catalog.deleteTable(tableName);
+    catalog.dropTable(tableName);
     assertFalse(catalog.existsTable(tableName));
   }
-
 }