You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/10/07 08:37:03 UTC

git commit: TAJO-179: Support MySQL CatalogStore. (jinho)

Updated Branches:
  refs/heads/master 14ed2ea23 -> c0c6d47f3


TAJO-179: Support MySQL CatalogStore. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/c0c6d47f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/c0c6d47f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/c0c6d47f

Branch: refs/heads/master
Commit: c0c6d47f3d0694458dfabf3d113b8fa941680622
Parents: 14ed2ea
Author: jinossy <ji...@gmail.com>
Authored: Mon Oct 7 15:35:32 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Mon Oct 7 15:35:32 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/tajo/catalog/CatalogConstants.java   |  17 +-
 .../org/apache/tajo/catalog/CatalogUtil.java    |  20 +
 .../src/main/resources/catalog-default.xml      |   4 +-
 .../org/apache/tajo/catalog/CatalogServer.java  |   4 +-
 .../tajo/catalog/store/AbstractDBStore.java     | 738 +++++++++++++++
 .../apache/tajo/catalog/store/DerbyStore.java   | 886 +++++++++++++++++++
 .../apache/tajo/catalog/store/MySQLStore.java   | 184 ++++
 .../org/apache/tajo/catalog/TestDBStore.java    |  15 +-
 .../src/test/resources/catalog-default.xml      |   2 +-
 10 files changed, 1855 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 32685cb..f990530 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.2.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-179: Support MySQL CatalogStore. (jinho)
+
     TAJO-147: Implement trim(text), ltrim(text), and rtrim(text) function.
     (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index 6625c7d..94736f7 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -20,11 +20,18 @@ package org.apache.tajo.catalog;
 
 public class CatalogConstants {
   public static final String STORE_CLASS="tajo.catalog.store.class";
-  
-  public static final String JDBC_DRIVER="tajo.catalog.jdbc.driver";
-  public static final String DEFAULT_JDBC_DRIVER="org.apache.derby.jdbc.EmbeddedDriver";
-  
+
+  //public static final String JDBC_DRIVER = "tajo.catalog.jdbc.driver";
+  public static final String CONNECTION_ID = "tajo.catalog.jdbc.connection.id";
+  public static final String CONNECTION_PASSWORD = "tajo.catalog.jdbc.connection.password";
+
   public static final String JDBC_URI="tajo.catalog.jdbc.uri";
+  public static final String TB_META = "META";
+  public static final String TB_TABLES = "TABLES";
+  public static final String TB_COLUMNS = "COLUMNS";
+  public static final String TB_OPTIONS = "OPTIONS";
+  public static final String TB_INDEXES = "INDEXES";
+  public static final String TB_STATISTICS = "STATS";
+  public static final String C_TABLE_ID = "TABLE_ID";
 
-  private CatalogConstants() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index fb896f1..6e17f42 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -29,6 +29,10 @@ import org.apache.tajo.util.FileUtil;
 
 import java.io.File;
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.sql.Wrapper;
 import java.util.Collection;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -178,4 +182,20 @@ public class CatalogUtil {
   public static DataType newDataTypeWithLen(Type type, int length) {
     return DataType.newBuilder().setType(type).setLength(length).build();
   }
+
+  public static void closeSQLWrapper(Wrapper... wrapper) {
+    if(wrapper == null) return;
+
+    for(Wrapper w : wrapper){
+      try{
+        if(w instanceof Statement){
+          ((Statement)w).close();
+        } else if(w instanceof ResultSet){
+          ((ResultSet)w).close();
+        } else if(w instanceof Connection){
+          ((Connection)w).close();
+        }
+      } catch (Exception e){}
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml b/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
index 6308410..0e9e109 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
+++ b/tajo-catalog/tajo-catalog-common/src/main/resources/catalog-default.xml
@@ -22,11 +22,11 @@
 <configuration>
   <property>
     <name>tajo.catalog.store.class</name>
-    <value>org.apache.tajo.catalog.store.DBStore</value>
+    <value>org.apache.tajo.catalog.store.DerbyStore</value>
   </property>
 
   <property>
     <name>tajo.catalog.jdbc.uri</name>
-    <value>jdbc:derby:/tmp/tcat-${user.name}/db</value>
+    <value>jdbc:derby:/tmp/tcat-${user.name}/db;create=true</value>
   </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 9c86d41..f810431 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -29,7 +29,7 @@ import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
 import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.catalog.store.CatalogStore;
-import org.apache.tajo.catalog.store.DBStore;
+import org.apache.tajo.catalog.store.DerbyStore;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -105,7 +105,7 @@ public class CatalogServer extends AbstractService {
     Constructor<?> cons;
     try {
       Class<?> storeClass =
-          this.conf.getClass(CatalogConstants.STORE_CLASS, DBStore.class);
+          this.conf.getClass(CatalogConstants.STORE_CLASS, DerbyStore.class);
 
       LOG.info("Catalog Store Class: " + storeClass.getCanonicalName());
       cons = storeClass.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
new file mode 100644
index 0000000..d6413f8
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -0,0 +1,738 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.catalog.store;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.exception.InternalException;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+public abstract class AbstractDBStore extends CatalogConstants implements CatalogStore {
+  protected final Log LOG = LogFactory.getLog(getClass());
+  protected Configuration conf;
+  protected String jdbcUri;
+  protected Connection conn;
+
+  protected static final int VERSION = 1;
+
+  protected abstract String getJDBCDriverName();
+
+  protected abstract Connection createConnection(final Configuration conf) throws SQLException;
+
+  protected abstract boolean isInitialized() throws SQLException;
+
+  protected abstract void createBaseTable() throws SQLException;
+
+  public AbstractDBStore(Configuration conf)
+      throws InternalException {
+
+    this.conf = conf;
+    this.jdbcUri = conf.get(JDBC_URI);
+    String jdbcDriver = getJDBCDriverName();
+    try {
+      Class.forName(getJDBCDriverName()).newInstance();
+      LOG.info("Loaded the JDBC driver (" + jdbcDriver + ")");
+    } catch (Exception e) {
+      throw new InternalException("Cannot load JDBC driver " + jdbcDriver, e);
+    }
+
+    try {
+      LOG.info("Trying to connect database (" + jdbcUri + ")");
+      conn = createConnection(conf);
+      LOG.info("Connected to database (" + jdbcUri + ")");
+    } catch (SQLException e) {
+      throw new InternalException("Cannot connect to database (" + jdbcUri
+          + ")", e);
+    }
+
+    try {
+      if (!isInitialized()) {
+        LOG.info("The base tables of CatalogServer are created.");
+        createBaseTable();
+      } else {
+        LOG.info("The base tables of CatalogServer already is initialized.");
+      }
+    } catch (SQLException se) {
+      throw new InternalException(
+          "Cannot initialize the persistent storage of Catalog", se);
+    }
+
+    int dbVersion = 0;
+    try {
+      dbVersion = needUpgrade();
+    } catch (SQLException e) {
+      throw new InternalException(
+          "Cannot check if the DB need to be upgraded", e);
+    }
+
+//    if (dbVersion < VERSION) {
+//      LOG.info("DB Upgrade is needed");
+//      try {
+//        upgrade(dbVersion, VERSION);
+//      } catch (SQLException e) {
+//        LOG.error(e.getMessage());
+//        throw new InternalException("DB upgrade is failed.", e);
+//      }
+//    }
+  }
+
+  protected String getJDBCUri(){
+    return jdbcUri;
+  }
+
+  private int needUpgrade() throws SQLException {
+    String sql = "SELECT VERSION FROM " + TB_META;
+    Statement stmt = conn.createStatement();
+    ResultSet res = stmt.executeQuery(sql);
+
+    if (!res.next()) { // if this db version is 0
+      insertVersion();
+      return 0;
+    } else {
+      return res.getInt(1);
+    }
+  }
+
+  private void insertVersion() throws SQLException {
+    String sql = "INSERT INTO " + TB_META + " values (0)";
+    Statement stmt = conn.createStatement();
+    stmt.executeUpdate(sql);
+    stmt.close();
+  }
+
+  private void upgrade(int from, int to) throws SQLException {
+    String sql;
+    Statement stmt;
+    if (from == 0) {
+      if (to == 1) {
+        sql = "DROP INDEX idx_options_key";
+        LOG.info(sql);
+
+        stmt = conn.createStatement();
+        stmt.addBatch(sql);
+
+        sql =
+            "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+        stmt.addBatch(sql);
+        LOG.info(sql);
+        stmt.executeBatch();
+        stmt.close();
+
+        LOG.info("DB Upgraded from " + from + " to " + to);
+      } else {
+        LOG.info("DB Upgraded from " + from + " to " + to);
+      }
+    }
+  }
+
+
+  @Override
+  public void addTable(final TableDesc table) throws IOException {
+    Statement stmt = null;
+    ResultSet res;
+
+    String sql =
+        "INSERT INTO " + TB_TABLES + " (" + C_TABLE_ID + ", path, store_type) "
+            + "VALUES('" + table.getName() + "', "
+            + "'" + table.getPath() + "', "
+            + "'" + table.getMeta().getStoreType() + "'"
+            + ")";
+
+    try {
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt.executeUpdate(sql);
+
+
+      stmt = conn.createStatement();
+      sql = "SELECT TID from " + TB_TABLES + " WHERE " + C_TABLE_ID
+          + " = '" + table.getName() + "'";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+      if (!res.next()) {
+        throw new IOException("ERROR: there is no tid matched to "
+            + table.getName());
+      }
+      int tid = res.getInt("TID");
+
+      String colSql;
+      int columnId = 0;
+      for (Column col : table.getMeta().getSchema().getColumns()) {
+        colSql = columnToSQL(tid, table, columnId, col);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(colSql);
+        }
+        stmt.addBatch(colSql);
+        columnId++;
+      }
+
+      Iterator<Entry<String, String>> it = table.getMeta().toMap().entrySet().iterator();
+      String optSql;
+      while (it.hasNext()) {
+        optSql = keyvalToSQL(table, it.next());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(optSql);
+        }
+        stmt.addBatch(optSql);
+      }
+      stmt.executeBatch();
+      if (table.getMeta().getStat() != null) {
+        sql = "INSERT INTO " + TB_STATISTICS + " (" + C_TABLE_ID + ", num_rows, num_bytes) "
+            + "VALUES ('" + table.getName() + "', "
+            + table.getMeta().getStat().getNumRows() + ","
+            + table.getMeta().getStat().getNumBytes() + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        stmt.addBatch(sql);
+        stmt.executeBatch();
+      }
+    } catch (SQLException se) {
+      throw new IOException(se.getMessage(), se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+
+  private String columnToSQL(final int tid, final TableDesc desc,
+                             final int columnId, final Column col) {
+    String sql =
+        "INSERT INTO " + TB_COLUMNS
+            + " (tid, " + C_TABLE_ID + ", column_id, column_name, data_type, type_length) "
+            + "VALUES("
+            + tid + ","
+            + "'" + desc.getName() + "',"
+            + columnId + ", "
+            + "'" + col.getColumnName() + "',"
+            + "'" + col.getDataType().getType().name() + "',"
+            + (col.getDataType().hasLength() ? col.getDataType().getLength() : 0)
+            + ")";
+
+    return sql;
+  }
+
+  private String keyvalToSQL(final TableDesc desc,
+                             final Entry<String, String> keyVal) {
+    String sql =
+        "INSERT INTO " + TB_OPTIONS
+            + " (" + C_TABLE_ID + ", key_, value_) "
+            + "VALUES("
+            + "'" + desc.getName() + "',"
+            + "'" + keyVal.getKey() + "',"
+            + "'" + keyVal.getValue() + "'"
+            + ")";
+
+    return sql;
+  }
+
+  @Override
+  public boolean existTable(final String name) throws IOException {
+    StringBuilder sql = new StringBuilder();
+    sql.append("SELECT " + C_TABLE_ID + " from ")
+        .append(TB_TABLES)
+        .append(" WHERE " + C_TABLE_ID + " = '")
+        .append(name)
+        .append("'");
+
+    Statement stmt = null;
+    boolean exist = false;
+
+    try {
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql.toString());
+      }
+      ResultSet res = stmt.executeQuery(sql.toString());
+      exist = res.next();
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+
+    return exist;
+  }
+
+  @Override
+  public void deleteTable(final String name) throws IOException {
+    Statement stmt = null;
+    String sql = null;
+
+    try {
+      stmt = conn.createStatement();
+      sql = "DELETE FROM " + TB_COLUMNS +
+          " WHERE " + C_TABLE_ID + " = '" + name + "'";
+      LOG.info(sql);
+      stmt.execute(sql);
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+
+    try {
+      sql = "DELETE FROM " + TB_OPTIONS +
+          " WHERE " + C_TABLE_ID + " = '" + name + "'";
+      LOG.info(sql);
+      stmt = conn.createStatement();
+      stmt.execute(sql);
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+
+    try {
+      sql = "DELETE FROM " + TB_STATISTICS +
+          " WHERE " + C_TABLE_ID + " = '" + name + "'";
+      LOG.info(sql);
+      stmt = conn.createStatement();
+      stmt.execute(sql);
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+
+    try {
+      sql = "DELETE FROM " + TB_TABLES +
+          " WHERE " + C_TABLE_ID + " = '" + name + "'";
+      LOG.info(sql);
+      stmt = conn.createStatement();
+      stmt.execute(sql);
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+
+  @Override
+  public TableDesc getTable(final String name) throws IOException {
+    ResultSet res = null;
+    Statement stmt = null;
+
+    String tableName = null;
+    Path path = null;
+    StoreType storeType = null;
+    Options options;
+    TableStat stat = null;
+
+    try {
+      String sql =
+          "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+              + " WHERE " + C_TABLE_ID + "='" + name + "'";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt = conn.createStatement();
+      res = stmt.executeQuery(sql);
+      if (!res.next()) { // there is no table of the given name.
+        return null;
+      }
+      tableName = res.getString(C_TABLE_ID).trim();
+      path = new Path(res.getString("path").trim());
+      storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    Schema schema = null;
+    try {
+      String sql = "SELECT column_name, data_type, type_length from " + TB_COLUMNS
+          + " WHERE " + C_TABLE_ID + "='" + name + "' ORDER by column_id asc";
+
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+
+      schema = new Schema();
+      while (res.next()) {
+        String columnName = tableName + "."
+            + res.getString("column_name").trim();
+        Type dataType = getDataType(res.getString("data_type")
+            .trim());
+        int typeLength = res.getInt("type_length");
+        if (typeLength > 0) {
+          schema.addColumn(columnName, dataType, typeLength);
+        } else {
+          schema.addColumn(columnName, dataType);
+        }
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    options = Options.create();
+    try {
+      String sql = "SELECT key_, value_ from " + TB_OPTIONS
+          + " WHERE " + C_TABLE_ID + "='" + name + "'";
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+
+      while (res.next()) {
+        options.put(
+            res.getString("key_"),
+            res.getString("value_"));
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    try {
+      String sql = "SELECT num_rows, num_bytes from " + TB_STATISTICS
+          + " WHERE " + C_TABLE_ID + "='" + name + "'";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt = conn.createStatement();
+      res = stmt.executeQuery(sql);
+
+      if (res.next()) {
+        stat = new TableStat();
+        stat.setNumRows(res.getLong("num_rows"));
+        stat.setNumBytes(res.getLong("num_bytes"));
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    TableMeta meta = new TableMetaImpl(schema, storeType, options);
+    if (stat != null) {
+      meta.setStat(stat);
+    }
+    TableDesc table = new TableDescImpl(tableName, meta, path);
+
+    return table;
+  }
+
+  private Type getDataType(final String typeStr) {
+    try {
+      return Enum.valueOf(Type.class, typeStr);
+    } catch (IllegalArgumentException iae) {
+      LOG.error("Cannot find a matched type aginst from '" + typeStr + "'");
+      return null;
+    }
+  }
+
+  @Override
+  public List<String> getAllTableNames() throws IOException {
+    String sql = "SELECT " + C_TABLE_ID + " from " + TB_TABLES;
+
+    Statement stmt = null;
+    ResultSet res = null;
+
+    List<String> tables = new ArrayList<String>();
+
+    try {
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+      while (res.next()) {
+        tables.add(res.getString(C_TABLE_ID).trim());
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    return tables;
+  }
+
+  @Override
+  public void addIndex(final IndexDescProto proto) throws IOException {
+    String sql =
+        "INSERT INTO indexes (index_name, " + C_TABLE_ID + ", column_name, "
+            + "data_type, index_type, is_unique, is_clustered, is_ascending) VALUES "
+            + "(?,?,?,?,?,?,?,?)";
+
+    PreparedStatement stmt = null;
+
+    try {
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, proto.getName());
+      stmt.setString(2, proto.getTableId());
+      stmt.setString(3, proto.getColumn().getColumnName());
+      stmt.setString(4, proto.getColumn().getDataType().getType().name());
+      stmt.setString(5, proto.getIndexMethod().toString());
+      stmt.setBoolean(6, proto.hasIsUnique() && proto.getIsUnique());
+      stmt.setBoolean(7, proto.hasIsClustered() && proto.getIsClustered());
+      stmt.setBoolean(8, proto.hasIsAscending() && proto.getIsAscending());
+      stmt.executeUpdate();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stmt.toString());
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+
+  @Override
+  public void delIndex(final String indexName) throws IOException {
+    String sql =
+        "DELETE FROM " + TB_INDEXES
+            + " WHERE index_name='" + indexName + "'";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+
+    Statement stmt = null;
+
+    try {
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt.executeUpdate(sql);
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+
+  @Override
+  public IndexDescProto getIndex(final String indexName)
+      throws IOException {
+    ResultSet res = null;
+    PreparedStatement stmt = null;
+
+    IndexDescProto proto = null;
+
+    try {
+      String sql =
+          "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+              + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+              + "where index_name = ?";
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, indexName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stmt.toString());
+      }
+      res = stmt.executeQuery();
+      if (!res.next()) {
+        throw new IOException("ERROR: there is no index matched to " + indexName);
+      }
+      proto = resultToProto(res);
+    } catch (SQLException se) {
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    return proto;
+  }
+
+  @Override
+  public IndexDescProto getIndex(final String tableName,
+                                 final String columnName) throws IOException {
+    ResultSet res = null;
+    PreparedStatement stmt = null;
+
+    IndexDescProto proto = null;
+
+    try {
+      String sql =
+          "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+              + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+              + "where " + C_TABLE_ID + " = ? AND column_name = ?";
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, tableName);
+      stmt.setString(2, columnName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery();
+      if (!res.next()) {
+        throw new IOException("ERROR: there is no index matched to "
+            + tableName + "." + columnName);
+      }
+      proto = resultToProto(res);
+    } catch (SQLException se) {
+      new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    return proto;
+  }
+
+  @Override
+  public boolean existIndex(final String indexName) throws IOException {
+    String sql = "SELECT index_name from " + TB_INDEXES
+        + " WHERE index_name = ?";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+
+    PreparedStatement stmt = null;
+    ResultSet res = null;
+    boolean exist = false;
+
+    try {
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, indexName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery();
+      exist = res.next();
+    } catch (SQLException se) {
+
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    return exist;
+  }
+
+  @Override
+  public boolean existIndex(String tableName, String columnName)
+      throws IOException {
+    String sql = "SELECT index_name from " + TB_INDEXES
+        + " WHERE " + C_TABLE_ID + " = ? AND COLUMN_NAME = ?";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+
+    PreparedStatement stmt = null;
+    boolean exist = false;
+    ResultSet res = null;
+
+    try {
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, tableName);
+      stmt.setString(2, columnName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery();
+      exist = res.next();
+    } catch (SQLException se) {
+
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    return exist;
+  }
+
+  @Override
+  public IndexDescProto[] getIndexes(final String tableName)
+      throws IOException {
+    ResultSet res = null;
+    PreparedStatement stmt = null;
+
+    List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
+
+    try {
+      String sql = "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+          + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+          + "where " + C_TABLE_ID + "= ?";
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, tableName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery();
+      while (res.next()) {
+        protos.add(resultToProto(res));
+      }
+    } catch (SQLException se) {
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    return protos.toArray(new IndexDescProto[protos.size()]);
+  }
+
+  private IndexDescProto resultToProto(final ResultSet res) throws SQLException {
+    IndexDescProto.Builder builder = IndexDescProto.newBuilder();
+    builder.setName(res.getString("index_name"));
+    builder.setTableId(res.getString(C_TABLE_ID));
+    builder.setColumn(resultToColumnProto(res));
+    builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
+    builder.setIsUnique(res.getBoolean("is_unique"));
+    builder.setIsClustered(res.getBoolean("is_clustered"));
+    builder.setIsAscending(res.getBoolean("is_ascending"));
+    return builder.build();
+  }
+
+  private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
+    ColumnProto.Builder builder = ColumnProto.newBuilder();
+    builder.setColumnName(res.getString("column_name"));
+    builder.setDataType(CatalogUtil.newSimpleDataType(getDataType(res.getString("data_type").trim())));
+    return builder.build();
+  }
+
+  private IndexMethod getIndexMethod(final String typeStr) {
+    if (typeStr.equals(IndexMethod.TWO_LEVEL_BIN_TREE.toString())) {
+      return IndexMethod.TWO_LEVEL_BIN_TREE;
+    } else {
+      LOG.error("Cannot find a matched type aginst from '"
+          + typeStr + "'");
+      // TODO - needs exception handling
+      return null;
+    }
+  }
+
+  @Override
+  public void close() {
+    CatalogUtil.closeSQLWrapper(conn);
+    LOG.info("Shutdown database (" + jdbcUri + ")");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
new file mode 100644
index 0000000..21d8a02
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -0,0 +1,886 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.catalog.store;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.InternalException;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DerbyStore extends AbstractDBStore {
+  private static final String JDBC_DRIVER="org.apache.derby.jdbc.EmbeddedDriver";
+  protected Lock rlock;
+  protected Lock wlock;
+
+  protected String getJDBCDriverName(){
+    return JDBC_DRIVER;
+  }
+  
+  public DerbyStore(final Configuration conf)
+      throws InternalException {
+
+    super(conf);
+  }
+
+  protected Connection createConnection(Configuration conf) throws SQLException {
+    return DriverManager.getConnection(getJDBCUri());
+  }
+
+  // TODO - DDL and index statements should be renamed
+  protected void createBaseTable() throws SQLException {
+    wlock.lock();
+    Statement stmt = null;
+    try {
+      // META
+      stmt = conn.createStatement();
+      String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(meta_ddl);
+      }
+      stmt.executeUpdate(meta_ddl);
+      LOG.info("Table '" + TB_META + " is created.");
+
+      // TABLES
+      String tables_ddl = "CREATE TABLE "
+          + TB_TABLES + " ("
+          + "TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+          + C_TABLE_ID + " VARCHAR(255) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, "
+          + "path VARCHAR(1024), "
+          + "store_type CHAR(16), "
+          + "CONSTRAINT TABLES_PK PRIMARY KEY (TID)" +
+          ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(tables_ddl);
+      }
+      stmt.addBatch(tables_ddl);
+      String idx_tables_tid = 
+          "CREATE UNIQUE INDEX idx_tables_tid on " + TB_TABLES + " (TID)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_tables_tid);
+      }
+      stmt.addBatch(idx_tables_tid);
+      
+      String idx_tables_name = "CREATE UNIQUE INDEX idx_tables_name on " 
+          + TB_TABLES + "(" + C_TABLE_ID + ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_tables_name);
+      }
+      stmt.addBatch(idx_tables_name);
+      stmt.executeBatch();
+      LOG.info("Table '" + TB_TABLES + "' is created.");
+
+      // COLUMNS
+      String columns_ddl = 
+          "CREATE TABLE " + TB_COLUMNS + " ("
+          + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
+          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES " + TB_TABLES + "("
+          + C_TABLE_ID + ") ON DELETE CASCADE, "
+          + "column_id INT NOT NULL,"
+          + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+          + "CONSTRAINT C_COLUMN_ID UNIQUE (" + C_TABLE_ID + ", column_name))";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(columns_ddl);
+      }
+      stmt.addBatch(columns_ddl);
+
+      String idx_fk_columns_table_name = 
+          "CREATE UNIQUE INDEX idx_fk_columns_table_name on "
+          + TB_COLUMNS + "(" + C_TABLE_ID + ", column_name)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_fk_columns_table_name);
+      }
+      stmt.addBatch(idx_fk_columns_table_name);
+      stmt.executeBatch();
+      LOG.info("Table '" + TB_COLUMNS + " is created.");
+
+      // OPTIONS
+      String options_ddl = 
+          "CREATE TABLE " + TB_OPTIONS +" ("
+          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID +") "
+          + "ON DELETE CASCADE, "
+          + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(options_ddl);
+      }
+      stmt.addBatch(options_ddl);
+      
+      String idx_options_key = 
+          "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_options_key);
+      }
+      stmt.addBatch(idx_options_key);
+      String idx_options_table_name = 
+          "CREATE INDEX idx_options_table_name on " + TB_OPTIONS 
+          + "(" + C_TABLE_ID + ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_options_table_name);
+      }
+      stmt.addBatch(idx_options_table_name);
+      stmt.executeBatch();
+      LOG.info("Table '" + TB_OPTIONS + " is created.");
+      
+      // INDEXES
+      String indexes_ddl = "CREATE TABLE " + TB_INDEXES +"("
+          + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
+          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+          + "ON DELETE CASCADE, "
+          + "column_name VARCHAR(255) NOT NULL, "
+          + "data_type VARCHAR(255) NOT NULL, "
+          + "index_type CHAR(32) NOT NULL, "
+          + "is_unique BOOLEAN NOT NULL, "
+          + "is_clustered BOOLEAN NOT NULL, "
+          + "is_ascending BOOLEAN NOT NULL)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(indexes_ddl);
+      }
+      stmt.addBatch(indexes_ddl);
+      
+      String idx_indexes_key = "CREATE UNIQUE INDEX idx_indexes_key ON " 
+          + TB_INDEXES + " (index_name)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_indexes_key);
+      }      
+      stmt.addBatch(idx_indexes_key);
+      
+      String idx_indexes_columns = "CREATE INDEX idx_indexes_columns ON " 
+          + TB_INDEXES + " (" + C_TABLE_ID + ", column_name)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_indexes_columns);
+      } 
+      stmt.addBatch(idx_indexes_columns);
+      stmt.executeBatch();
+      LOG.info("Table '" + TB_INDEXES + "' is created.");
+
+      String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+          + "ON DELETE CASCADE, "
+          + "num_rows BIGINT, "
+          + "num_bytes BIGINT)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stats_ddl);
+      }
+      stmt.addBatch(stats_ddl);
+
+      String idx_stats_fk_table_name = "CREATE INDEX idx_stats_table_name ON "
+          + TB_STATISTICS + " (" + C_TABLE_ID + ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_stats_fk_table_name);
+      }
+      stmt.addBatch(idx_stats_fk_table_name);
+      stmt.executeBatch();
+      LOG.info("Table '" + TB_STATISTICS + "' is created.");
+
+    } finally {
+      wlock.unlock();
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+
+  @Override
+  protected boolean isInitialized() throws SQLException {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.rlock = lock.readLock();
+    this.wlock = lock.writeLock();
+
+    wlock.lock();
+    ResultSet res = null;
+    try {
+      boolean found = false;
+      res = conn.getMetaData().getTables(null, null, null,
+          new String [] {"TABLE"});
+      
+      String resName;
+      while (res.next() && !found) {
+        resName = res.getString("TABLE_NAME");
+        if (TB_META.equals(resName)
+            || TB_TABLES.equals(resName)
+            || TB_COLUMNS.equals(resName)
+            || TB_OPTIONS.equals(resName)) {
+            return true;
+        }
+      }
+    } finally {
+      wlock.unlock();
+      CatalogUtil.closeSQLWrapper(res);
+    }    
+    return false;
+  }
+  
+  final boolean checkInternalTable(final String tableName) throws SQLException {
+    rlock.lock();
+    ResultSet res = null;
+    try {
+      boolean found = false;
+      res = conn.getMetaData().getTables(null, null, null,
+              new String [] {"TABLE"});
+      while(res.next() && !found) {
+        if (tableName.equals(res.getString("TABLE_NAME")))
+          found = true;
+      }
+      
+      return found;
+    } finally {
+      rlock.unlock();
+      CatalogUtil.closeSQLWrapper(res);
+    }
+  }
+  
+  @Override
+  public final void addTable(final TableDesc table) throws IOException {
+    Statement stmt = null;
+    ResultSet res = null;
+
+    String sql = 
+        "INSERT INTO " + TB_TABLES + " (" + C_TABLE_ID + ", path, store_type) "
+        + "VALUES('" + table.getName() + "', "
+        + "'" + table.getPath() + "', "
+        + "'" + table.getMeta().getStoreType() + "'"
+        + ")";
+
+    wlock.lock();
+    try {
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt.executeUpdate(sql);
+
+      sql = "SELECT TID from " + TB_TABLES + " WHERE " + C_TABLE_ID 
+          + " = '" + table.getName() + "'";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+      if (!res.next()) {
+        throw new IOException("ERROR: there is no tid matched to " 
+            + table.getName());
+      }
+      int tid = res.getInt("TID");
+
+      String colSql;
+      int columnId = 0;
+      for (Column col : table.getMeta().getSchema().getColumns()) {
+        colSql = columnToSQL(tid, table, columnId, col);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(colSql);
+        }
+        stmt.addBatch(colSql);
+        columnId++;
+      }
+      
+      Iterator<Entry<String,String>> it = table.getMeta().toMap().entrySet().iterator();
+      String optSql;
+      while (it.hasNext()) {
+        optSql = keyvalToSQL(table, it.next());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(optSql);
+        }
+        stmt.addBatch(optSql);
+      }
+      stmt.executeBatch();
+      if (table.getMeta().getStat() != null) {
+        sql = "INSERT INTO " + TB_STATISTICS + " (" + C_TABLE_ID + ", num_rows, num_bytes) "
+            + "VALUES ('" + table.getName() + "', "
+            + table.getMeta().getStat().getNumRows() + ","
+            + table.getMeta().getStat().getNumBytes() + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        stmt.executeUpdate(sql);
+      }
+    } catch (SQLException se) {
+      throw new IOException(se.getMessage(), se);
+    } finally {
+      wlock.unlock();
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+  }
+  
+  private String columnToSQL(final int tid, final TableDesc desc, 
+      final int columnId, final Column col) {
+    String sql =
+        "INSERT INTO " + TB_COLUMNS 
+        + " (tid, " + C_TABLE_ID + ", column_id, column_name, data_type, type_length) "
+        + "VALUES("
+        + tid + ","
+        + "'" + desc.getName() + "',"
+        + columnId + ", "
+        + "'" + col.getColumnName() + "',"
+        + "'" + col.getDataType().getType().name() + "',"
+        + (col.getDataType().hasLength() ? col.getDataType().getLength() : 0)
+        + ")";
+    
+    return sql;
+  }
+  
+  private String keyvalToSQL(final TableDesc desc,
+      final Entry<String,String> keyVal) {
+    String sql =
+        "INSERT INTO " + TB_OPTIONS 
+        + " (" + C_TABLE_ID + ", key_, value_) "
+        + "VALUES("
+        + "'" + desc.getName() + "',"
+        + "'" + keyVal.getKey() + "',"
+        + "'" + keyVal.getValue() + "'"
+        + ")";
+    
+    return sql;
+  }
+  
+  @Override
+  public final boolean existTable(final String name) throws IOException {
+    StringBuilder sql = new StringBuilder();
+    sql.append("SELECT " + C_TABLE_ID + " from ")
+    .append(TB_TABLES)
+    .append(" WHERE " + C_TABLE_ID + " = '")
+    .append(name)
+    .append("'");
+
+    Statement stmt = null;
+    ResultSet res = null;
+    boolean exist = false;
+    rlock.lock();
+    try {
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql.toString());
+      }
+      res = stmt.executeQuery(sql.toString());
+      exist = res.next();
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      rlock.unlock();
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    
+    return exist;
+  }
+
+  @Override
+  public final void deleteTable(final String name) throws IOException {
+    Statement stmt = null;
+    String sql = null;
+    try {
+      wlock.lock();
+      stmt = conn.createStatement();
+      try {
+        sql = "DELETE FROM " + TB_COLUMNS +
+            " WHERE " + C_TABLE_ID + " = '" + name + "'";
+        LOG.info(sql);
+        stmt.execute(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      }
+
+      try {
+        sql = "DELETE FROM " + TB_OPTIONS +
+            " WHERE " + C_TABLE_ID + " = '" + name + "'";
+        LOG.info(sql);
+        stmt.execute(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      }
+
+      try {
+        sql = "DELETE FROM " + TB_STATISTICS +
+            " WHERE " + C_TABLE_ID + " = '" + name + "'";
+        LOG.info(sql);
+        stmt.execute(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      }
+
+      try {
+        sql = "DELETE FROM " + TB_TABLES +
+            " WHERE " + C_TABLE_ID +" = '" + name + "'";
+        LOG.info(sql);
+        stmt.execute(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      }
+
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      wlock.unlock();
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+
+  @Override
+  public final TableDesc getTable(final String name) throws IOException {
+    ResultSet res = null;
+    Statement stmt = null;
+
+    String tableName = null;
+    Path path = null;
+    StoreType storeType = null;
+    Options options;
+    TableStat stat = null;
+
+    try {
+      rlock.lock();
+      stmt = conn.createStatement();
+
+      try {
+        String sql = 
+            "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+            + " WHERE " + C_TABLE_ID + "='" + name + "'";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+
+        res = stmt.executeQuery(sql);
+        if (!res.next()) { // there is no table of the given name.
+          return null;
+        }
+        tableName = res.getString(C_TABLE_ID).trim();
+        path = new Path(res.getString("path").trim());
+        storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+      } catch (SQLException se) { 
+        throw new IOException(se);
+      } finally {
+        CatalogUtil.closeSQLWrapper(res);
+      }
+      
+      Schema schema = null;
+      try {
+        String sql = "SELECT column_name, data_type, type_length from " + TB_COLUMNS
+            + " WHERE " + C_TABLE_ID + "='" + name + "' ORDER by column_id asc";
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        res = stmt.executeQuery(sql);
+
+        schema = new Schema();
+        while (res.next()) {
+          String columnName = tableName + "." 
+              + res.getString("column_name").trim();
+          Type dataType = getDataType(res.getString("data_type")
+              .trim());
+          int typeLength = res.getInt("type_length");
+          if (typeLength > 0) {
+            schema.addColumn(columnName, dataType, typeLength);
+          } else {
+            schema.addColumn(columnName, dataType);
+          }
+        }
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        CatalogUtil.closeSQLWrapper(res);
+      }
+      
+      options = Options.create();
+      try {
+        String sql = "SELECT key_, value_ from " + TB_OPTIONS
+            + " WHERE " + C_TABLE_ID + "='" + name + "'";
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        res = stmt.executeQuery(sql);        
+        
+        while (res.next()) {
+          options.put(
+              res.getString("key_"),
+              res.getString("value_"));          
+        }
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        CatalogUtil.closeSQLWrapper(res);
+      }
+
+      try {
+        String sql = "SELECT num_rows, num_bytes from " + TB_STATISTICS
+            + " WHERE " + C_TABLE_ID + "='" + name + "'";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+
+        res = stmt.executeQuery(sql);
+
+        if (res.next()) {
+          stat = new TableStat();
+          stat.setNumRows(res.getLong("num_rows"));
+          stat.setNumBytes(res.getLong("num_bytes"));
+        }
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        CatalogUtil.closeSQLWrapper(res);
+      }
+
+      TableMeta meta = new TableMetaImpl(schema, storeType, options);
+      if (stat != null) {
+        meta.setStat(stat);
+      }
+      TableDesc table = new TableDescImpl(tableName, meta, path);
+
+      return table;
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      rlock.unlock();
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+  
+  private Type getDataType(final String typeStr) {
+    try {
+    return Enum.valueOf(Type.class, typeStr);
+    } catch (IllegalArgumentException iae) {
+      LOG.error("Cannot find a matched type aginst from '" + typeStr + "'");
+      return null;
+    }
+  }
+  
+  @Override
+  public final List<String> getAllTableNames() throws IOException {
+    String sql = "SELECT " + C_TABLE_ID + " from " + TB_TABLES;
+    
+    Statement stmt = null;
+    ResultSet res = null;
+    
+    List<String> tables = new ArrayList<String>();
+    rlock.lock();
+    try {
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+      while (res.next()) {
+        tables.add(res.getString(C_TABLE_ID).trim());
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      rlock.unlock();
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    return tables;
+  }
+  
+  public final void addIndex(final IndexDescProto proto) throws IOException {
+    String sql = 
+        "INSERT INTO indexes (index_name, " + C_TABLE_ID + ", column_name, " 
+        +"data_type, index_type, is_unique, is_clustered, is_ascending) VALUES "
+        +"(?,?,?,?,?,?,?,?)";
+    
+    PreparedStatement stmt = null;
+
+    wlock.lock();
+    try {
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, proto.getName());
+      stmt.setString(2, proto.getTableId());
+      stmt.setString(3, proto.getColumn().getColumnName());
+      stmt.setString(4, proto.getColumn().getDataType().getType().name());
+      stmt.setString(5, proto.getIndexMethod().toString());
+      stmt.setBoolean(6, proto.hasIsUnique() && proto.getIsUnique());
+      stmt.setBoolean(7, proto.hasIsClustered() && proto.getIsClustered());
+      stmt.setBoolean(8, proto.hasIsAscending() && proto.getIsAscending());
+      stmt.executeUpdate();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stmt.toString());
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      wlock.unlock();
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+  
+  public final void delIndex(final String indexName) throws IOException {
+    String sql =
+        "DELETE FROM " + TB_INDEXES
+        + " WHERE index_name='" + indexName + "'";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+      
+      Statement stmt = null;
+      wlock.lock(); 
+      try {
+        stmt = conn.createStatement();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        stmt.executeUpdate(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        wlock.unlock();
+        CatalogUtil.closeSQLWrapper(stmt);
+      }
+  }
+  
+  public final IndexDescProto getIndex(final String indexName) 
+      throws IOException {
+    ResultSet res = null;
+    PreparedStatement stmt = null;
+    
+    IndexDescProto proto = null;
+    
+    rlock.lock();
+    try {
+      String sql = 
+          "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, " 
+          + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+          + "where index_name = ?";
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, indexName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stmt.toString());
+      }
+      res = stmt.executeQuery();
+      if (!res.next()) {
+        throw new IOException("ERROR: there is no index matched to " + indexName);
+      }
+      proto = resultToProto(res);
+    } catch (SQLException se) {
+    } finally {
+      rlock.unlock();
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    
+    return proto;
+  }
+  
+  public final IndexDescProto getIndex(final String tableName, 
+      final String columnName) throws IOException {
+    ResultSet res = null;
+    PreparedStatement stmt = null;
+    
+    IndexDescProto proto = null;
+    
+    rlock.lock();
+    try {
+      String sql = 
+          "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, " 
+          + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+          + "where " + C_TABLE_ID + " = ? AND column_name = ?";
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, tableName);
+      stmt.setString(2, columnName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery();
+      if (!res.next()) {
+        throw new IOException("ERROR: there is no index matched to " 
+            + tableName + "." + columnName);
+      }      
+      proto = resultToProto(res);
+    } catch (SQLException se) {
+      new IOException(se);
+    } finally {
+      rlock.unlock();
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    
+    return proto;
+  }
+  
+  public final boolean existIndex(final String indexName) throws IOException {
+    String sql = "SELECT index_name from " + TB_INDEXES 
+        + " WHERE index_name = ?";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+
+    PreparedStatement stmt = null;
+    ResultSet res = null;
+    boolean exist = false;
+    rlock.lock();
+    try {
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, indexName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery();
+      exist = res.next();
+    } catch (SQLException se) {
+      
+    } finally {
+      rlock.unlock();
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    
+    return exist;
+  }
+  
+  @Override
+  public boolean existIndex(String tableName, String columnName)
+      throws IOException {
+    String sql = "SELECT index_name from " + TB_INDEXES 
+        + " WHERE " + C_TABLE_ID + " = ? AND COLUMN_NAME = ?";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+
+    PreparedStatement stmt = null;
+    ResultSet res = null;
+    boolean exist = false;
+    rlock.lock();
+    try {
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, tableName);
+      stmt.setString(2, columnName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery();
+      exist = res.next();
+    } catch (SQLException se) {
+      
+    } finally {
+      rlock.unlock();
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    
+    return exist;
+  }
+  
+  public final IndexDescProto [] getIndexes(final String tableName) 
+      throws IOException {
+    ResultSet res = null;
+    PreparedStatement stmt = null;
+    
+    List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
+    
+    rlock.lock();
+    try {
+      String sql = "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, " 
+          + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+          + "where " + C_TABLE_ID + "= ?";
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, tableName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery();      
+      while (res.next()) {
+        protos.add(resultToProto(res));
+      }
+    } catch (SQLException se) {
+    } finally {
+      rlock.unlock();
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    
+    return protos.toArray(new IndexDescProto [protos.size()]);
+  }
+  
+  private IndexDescProto resultToProto(final ResultSet res) throws SQLException {
+    IndexDescProto.Builder builder = IndexDescProto.newBuilder();
+    builder.setName(res.getString("index_name"));
+    builder.setTableId(res.getString(C_TABLE_ID));
+    builder.setColumn(resultToColumnProto(res));
+    builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
+    builder.setIsUnique(res.getBoolean("is_unique"));
+    builder.setIsClustered(res.getBoolean("is_clustered"));
+    builder.setIsAscending(res.getBoolean("is_ascending"));
+    return builder.build();
+  }
+  
+  private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
+    ColumnProto.Builder builder = ColumnProto.newBuilder();
+    builder.setColumnName(res.getString("column_name"));
+    builder.setDataType(CatalogUtil.newSimpleDataType(
+        getDataType(res.getString("data_type").trim())));
+    return builder.build();
+  }
+  
+  private IndexMethod getIndexMethod(final String typeStr) {
+    if (typeStr.equals(IndexMethod.TWO_LEVEL_BIN_TREE.toString())) {
+      return IndexMethod.TWO_LEVEL_BIN_TREE;
+    } else {
+      LOG.error("Cannot find a matched type aginst from '"
+          + typeStr + "'");
+      // TODO - needs exception handling
+      return null;
+    }
+  }
+  
+  @Override
+  public final void addFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet    
+  }
+
+  @Override
+  public final void deleteFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet    
+  }
+
+  @Override
+  public final void existFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet    
+  }
+
+  @Override
+  public final List<String> getAllFunctionNames() throws IOException {
+    // TODO - not implemented yet
+    return null;
+  }
+
+  @Override
+  public final void close() {
+    try {
+      DriverManager.getConnection("jdbc:derby:;shutdown=true");
+    } catch (SQLException e) {
+      // TODO - to be fixed
+      //LOG.error(e.getMessage(), e);
+    }
+    
+    LOG.info("Shutdown database (" + jdbcUri + ")");
+  }
+  
+  
+  public static void main(final String[] args) throws IOException {
+    @SuppressWarnings("unused")
+    DerbyStore store = new DerbyStore(new TajoConf());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
new file mode 100644
index 0000000..c638205
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.catalog.store;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.exception.InternalException;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.List;
+
+public class MySQLStore extends AbstractDBStore  {
+
+  private static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";
+  protected String getJDBCDriverName(){
+    return JDBC_DRIVER;
+  }
+
+  public MySQLStore(Configuration conf) throws InternalException {
+    super(conf);
+  }
+
+  protected Connection createConnection(Configuration conf) throws SQLException {
+    Connection con = DriverManager.getConnection(getJDBCUri(), conf.get(CONNECTION_ID), conf.get(CONNECTION_PASSWORD));
+    //TODO con.setAutoCommit(false);
+    return con;
+  }
+
+  // TODO - DDL and index statements should be renamed
+  protected void createBaseTable() throws SQLException {
+
+    // META
+    Statement stmt = conn.createStatement();
+    String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(meta_ddl);
+    }
+    try {
+      int result = stmt.executeUpdate(meta_ddl);
+      LOG.info("Table '" + TB_META + " is created.");
+
+      // TABLES
+      String tables_ddl = "CREATE TABLE "
+          + TB_TABLES + " ("
+          + "TID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+          + C_TABLE_ID + " VARCHAR(255) NOT NULL UNIQUE, "
+          + "path TEXT, "
+          + "store_type CHAR(16)"
+          + ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(tables_ddl);
+      }
+
+      LOG.info("Table '" + TB_TABLES + "' is created.");
+      result = stmt.executeUpdate(tables_ddl);
+      // COLUMNS
+
+      String columns_ddl =
+          "CREATE TABLE " + TB_COLUMNS + " ("
+              + "TID INT NOT NULL,"
+              + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+              + "column_id INT NOT NULL,"
+              + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+              + "UNIQUE KEY(" + C_TABLE_ID + ", column_name),"
+              + "FOREIGN KEY(TID) REFERENCES "+TB_TABLES+"(TID) ON DELETE CASCADE,"
+              + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(columns_ddl);
+      }
+
+      LOG.info("Table '" + TB_COLUMNS + " is created.");
+      result = stmt.executeUpdate(columns_ddl);
+      // OPTIONS
+
+      String options_ddl =
+          "CREATE TABLE " + TB_OPTIONS + " ("
+              + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+              + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL,"
+              + "INDEX("+C_TABLE_ID+", key_),"
+              + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(options_ddl);
+      }
+      LOG.info("Table '" + TB_OPTIONS + " is created.");
+      result = stmt.executeUpdate(options_ddl);
+      // INDEXES
+
+      String indexes_ddl = "CREATE TABLE " + TB_INDEXES + "("
+          + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
+          + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+          + "column_name VARCHAR(255) NOT NULL, "
+          + "data_type VARCHAR(255) NOT NULL, "
+          + "index_type CHAR(32) NOT NULL, "
+          + "is_unique BOOLEAN NOT NULL, "
+          + "is_clustered BOOLEAN NOT NULL, "
+          + "is_ascending BOOLEAN NOT NULL,"
+          + "INDEX(" + C_TABLE_ID + ", column_name),"
+          + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(indexes_ddl);
+      }
+      LOG.info("Table '" + TB_INDEXES + "' is created.");
+      result = stmt.executeUpdate(indexes_ddl);
+
+      String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+          + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+          + "num_rows BIGINT, "
+          + "num_bytes BIGINT,"
+          + "INDEX("+C_TABLE_ID+"),"
+          + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stats_ddl);
+      }
+      LOG.info("Table '" + TB_STATISTICS + "' is created.");
+      result = stmt.executeUpdate(stats_ddl);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+
+  protected boolean isInitialized() throws SQLException {
+    boolean found = false;
+    ResultSet res = conn.getMetaData().getTables(null, null, null,
+        new String[]{"TABLE"});
+
+    String resName;
+    try {
+      while (res.next() && !found) {
+        resName = res.getString("TABLE_NAME");
+        if (TB_META.equals(resName)
+            || TB_TABLES.equals(resName)
+            || TB_COLUMNS.equals(resName)
+            || TB_OPTIONS.equals(resName)) {
+          return true;
+        }
+      }
+    } finally {
+      CatalogUtil.closeSQLWrapper(res);
+    }
+    return false;
+  }
+
+  @Override
+  public final void addFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet    
+  }
+
+  @Override
+  public final void deleteFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet    
+  }
+
+  @Override
+  public final void existFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet    
+  }
+
+  @Override
+  public final List<String> getAllFunctionNames() throws IOException {
+    // TODO - not implemented yet
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
index 47ec4dd..183746d 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
@@ -22,15 +22,16 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStat;
-import org.apache.tajo.catalog.store.DBStore;
+import org.apache.tajo.catalog.store.AbstractDBStore;
+import org.apache.tajo.catalog.store.DerbyStore;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
 import java.io.File;
 
@@ -39,16 +40,16 @@ import static org.junit.Assert.*;
 public class TestDBStore {
   private static final Log LOG = LogFactory.getLog(TestDBStore.class);  
   private static Configuration conf;
-  private static DBStore store;
+  private static AbstractDBStore store;
 
   @BeforeClass
   public static void setUp() throws Exception {
     conf = new TajoConf();
     Path testDir = CommonTestingUtil.getTestDir("target/test-data/TestDBSTore");
     File absolutePath = new File(testDir.toUri());
-    conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:"+absolutePath.getAbsolutePath()+"/db");
+    conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:"+absolutePath.getAbsolutePath()+"/db;create=true");
     LOG.info("derby repository is set to "+conf.get(CatalogConstants.JDBC_URI));
-    store = new DBStore(conf);
+    store = new DerbyStore(conf);
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/c0c6d47f/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml b/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
index 5349618..a6b2183 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
+++ b/tajo-catalog/tajo-catalog-server/src/test/resources/catalog-default.xml
@@ -32,6 +32,6 @@
 
   <property>
     <name>tajo.catalog.jdbc.uri</name>
-    <value>jdbc:derby:target/test-data/tcat/db</value>
+    <value>jdbc:derby:target/test-data/tcat/db;create=true</value>
   </property>
 </configuration>