You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/27 09:29:38 UTC

[3/4] TAJO-475: Table partition catalog recap. (Min Zhou and hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 60c88e9..5d6f747 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -21,19 +21,21 @@
  */
 package org.apache.tajo.catalog.store;
 
+import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.CatalogConstants;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
+import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
+
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.exception.InternalException;
 
@@ -43,7 +45,6 @@ import java.util.ArrayList;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map.Entry;
 
 public abstract class AbstractDBStore extends CatalogConstants implements CatalogStore {
   protected final Log LOG = LogFactory.getLog(getClass());
@@ -60,9 +61,9 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
 
   protected abstract Connection createConnection(final Configuration conf) throws SQLException;
 
-  protected abstract boolean isInitialized() throws SQLException;
+  protected abstract boolean isInitialized() throws IOException;
 
-  protected abstract void createBaseTable() throws SQLException;
+  protected abstract void createBaseTable() throws IOException;
 
   public AbstractDBStore(Configuration conf)
       throws InternalException {
@@ -117,7 +118,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       } else {
         LOG.info("The base tables of CatalogServer already is initialized.");
       }
-    } catch (SQLException se) {
+    } catch (IOException se) {
       throw new InternalException(
           "Cannot initialize the persistent storage of Catalog", se);
     }
@@ -125,7 +126,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     int dbVersion = 0;
     try {
       dbVersion = needUpgrade();
-    } catch (SQLException e) {
+    } catch (IOException e) {
       throw new InternalException(
           "Cannot check if the DB need to be upgraded", e);
     }
@@ -145,250 +146,189 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     return catalogUri;
   }
 
-  public Connection getConnection() throws SQLException{
-    boolean isValid = false;
-    try{
-      isValid = conn.isValid(100);
-    } catch (SQLException e){
-    }
-
-    if(!isValid){
+  public Connection getConnection() throws IOException {
+    try {
+      boolean isValid = conn.isValid(100);
+      if (!isValid) {
+        CatalogUtil.closeSQLWrapper(conn);
+        conn = createConnection(conf);
+      }
+    } catch (SQLException e) {
       CatalogUtil.closeSQLWrapper(conn);
-      conn = createConnection(conf);
+      throw new IOException(e);
     }
     return conn;
   }
 
-  private int needUpgrade() throws SQLException {
-    String sql = "SELECT VERSION FROM " + TB_META;
-    Statement stmt = getConnection().createStatement();
-    ResultSet res = stmt.executeQuery(sql);
+  private int needUpgrade() throws IOException {
+    Statement stmt = null;
+    try {
+      String sql = "SELECT VERSION FROM " + TB_META;
+      stmt = getConnection().createStatement();
+      ResultSet res = stmt.executeQuery(sql);
 
-    if (!res.next()) { // if this db version is 0
-      insertVersion();
-      return 0;
-    } else {
-      return res.getInt(1);
+      if (!res.next()) { // if this db version is 0
+        insertVersion();
+        return 0;
+      } else {
+        return res.getInt(1);
+      }
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
     }
   }
 
-  private void insertVersion() throws SQLException {
-    String sql = "INSERT INTO " + TB_META + " values (0)";
-    Statement stmt = getConnection().createStatement();
-    stmt.executeUpdate(sql);
-    stmt.close();
+  private void insertVersion() throws IOException {
+    Statement stmt = null;
+    try {
+      String sql = "INSERT INTO " + TB_META + " values (0)";
+      stmt = getConnection().createStatement();
+      stmt.executeUpdate(sql);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
   }
 
-  private void upgrade(int from, int to) throws SQLException {
-    String sql;
-    Statement stmt;
-    if (from == 0) {
-      if (to == 1) {
-        sql = "DROP INDEX idx_options_key";
-        LOG.info(sql);
+  private void upgrade(int from, int to) throws IOException {
+    Statement stmt = null;
+    try {
+      if (from == 0) {
+        if (to == 1) {
+          String sql = "DROP INDEX idx_options_key";
+          LOG.info(sql);
 
-        stmt = getConnection().createStatement();
-        stmt.addBatch(sql);
+          stmt = getConnection().createStatement();
+          stmt.addBatch(sql);
 
-        sql =
-            "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
-        stmt.addBatch(sql);
-        LOG.info(sql);
-        stmt.executeBatch();
-        stmt.close();
+          sql =
+              "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+          stmt.addBatch(sql);
+          LOG.info(sql);
+          stmt.executeBatch();
 
-        LOG.info("DB Upgraded from " + from + " to " + to);
-      } else {
-        LOG.info("DB Upgraded from " + from + " to " + to);
+          LOG.info("DB Upgraded from " + from + " to " + to);
+        } else {
+          LOG.info("DB Upgraded from " + from + " to " + to);
+        }
       }
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
     }
   }
 
 
   @Override
-  public void addTable(final TableDesc table) throws IOException {
-    Statement stmt = null;
+  public void addTable(final CatalogProtos.TableDescProto table) throws IOException {
     PreparedStatement pstmt = null;
+    ResultSet res = null;
+    Connection conn = getConnection();
+    try {
+      conn.setAutoCommit(false);
 
-    ResultSet res;
-
-    String sql =
-        "INSERT INTO " + TB_TABLES + " (" + C_TABLE_ID + ", path, store_type) "
-            + "VALUES('" + table.getName() + "', "
-            + "'" + table.getPath() + "', "
-            + "'" + table.getMeta().getStoreType() + "'"
-            + ")";
+      String tableName = table.getId().toLowerCase();
 
-    try {
-      stmt = getConnection().createStatement();
+      String sql = String.format("INSERT INTO %s (%s, path, store_type) VALUES(?, ?, ?) ", TB_TABLES, C_TABLE_ID);
+      pstmt = conn.prepareStatement(sql);
+      pstmt.setString(1, tableName);
+      pstmt.setString(2, table.getPath());
+      pstmt.setString(3, table.getMeta().getStoreType().name());
       if (LOG.isDebugEnabled()) {
         LOG.debug(sql);
       }
-      stmt.executeUpdate(sql);
+      pstmt.executeUpdate();
 
-
-      stmt = getConnection().createStatement();
-      sql = "SELECT TID from " + TB_TABLES + " WHERE " + C_TABLE_ID
-          + " = '" + table.getName() + "'";
+      String tidSql = String.format("SELECT TID from %s WHERE %s = ?", TB_TABLES, C_TABLE_ID);
+      pstmt = conn.prepareStatement(tidSql);
+      pstmt.setString(1, tableName);
       if (LOG.isDebugEnabled()) {
-        LOG.debug(sql);
+        LOG.debug(tidSql);
       }
-      res = stmt.executeQuery(sql);
+      res = pstmt.executeQuery();
       if (!res.next()) {
         throw new IOException("ERROR: there is no tid matched to "
-            + table.getName());
+            + table.getId());
       }
       int tid = res.getInt("TID");
 
-      String colSql;
-      int columnId = 0;
-      for (Column col : table.getSchema().getColumns()) {
-        colSql = columnToSQL(tid, table, columnId, col);
+      String colSql = String.format("INSERT INTO %s (TID, %s, column_id, column_name, data_type, type_length)"
+          + " VALUES(?, ?, ?, ?, ?, ?) ", TB_COLUMNS, C_TABLE_ID);
+      pstmt = conn.prepareStatement(colSql);
+      for(int i = 0; i < table.getSchema().getFieldsCount(); i++) {
+        ColumnProto col = table.getSchema().getFields(i);
+        pstmt.setInt(1, tid);
+        pstmt.setString(2, tableName);
+        pstmt.setInt(3, i);
+        pstmt.setString(4, col.getColumnName());
+        pstmt.setString(5, col.getDataType().getType().name());
+        pstmt.setInt(6, (col.getDataType().hasLength() ? col.getDataType().getLength() : 0));
+        pstmt.addBatch();
         if (LOG.isDebugEnabled()) {
           LOG.debug(colSql);
         }
-        stmt.addBatch(colSql);
-        columnId++;
       }
+      pstmt.executeBatch();
 
-
-      String optSql = String.format("INSERT INTO %s (%s, key_, value_) VALUES(?, ?, ?)", TB_OPTIONS, C_TABLE_ID);
-      pstmt = getConnection().prepareStatement(optSql);
-      try {
-        for (Entry<String, String> entry : table.getMeta().toMap().entrySet()) {
-          pstmt.setString(1, table.getName());
+      if(table.getMeta().hasParams()) {
+        String optSql = String.format("INSERT INTO %s (%s, key_, value_) VALUES(?, ?, ?)", TB_OPTIONS, C_TABLE_ID);
+        pstmt = conn.prepareStatement(optSql);
+        for (CatalogProtos.KeyValueProto entry : table.getMeta().getParams().getKeyvalList()) {
+          pstmt.setString(1, tableName);
           pstmt.setString(2, entry.getKey());
           pstmt.setString(3, entry.getValue());
+          pstmt.addBatch();
           if (LOG.isDebugEnabled()) {
             LOG.debug(optSql);
           }
-          pstmt.addBatch();
         }
         pstmt.executeBatch();
-      } finally {
-        CatalogUtil.closeSQLWrapper(pstmt);
       }
 
-      if (table.getStats() != null) {
-        sql = "INSERT INTO " + TB_STATISTICS + " (" + C_TABLE_ID + ", num_rows, num_bytes) "
-            + "VALUES ('" + table.getName() + "', "
-            + table.getStats().getNumRows() + ","
-            + table.getStats().getNumBytes() + ")";
+
+      if (table.hasStats()) {
+        String statSql =
+            String.format("INSERT INTO %s (%s, num_rows, num_bytes) VALUES(?, ?, ?)", TB_STATISTICS, C_TABLE_ID);
+        pstmt = conn.prepareStatement(statSql);
+        pstmt.setString(1, tableName);
+        pstmt.setLong(2, table.getStats().getNumRows());
+        pstmt.setLong(3, table.getStats().getNumBytes());
+        pstmt.addBatch();
         if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
+          LOG.debug(statSql);
         }
-        stmt.addBatch(sql);
-        stmt.executeBatch();
-      }
-
-      //Partition
-      if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
-        try {
-          PartitionDesc partitionDesc = table.getPartitions();
-          List<Column> columnList = partitionDesc.getColumns();
-
-          // Find columns which used for a partitioned table.
-          StringBuffer columns = new StringBuffer();
-          for(Column eachColumn : columnList) {
-            sql = "SELECT column_id from " + TB_COLUMNS + " WHERE TID "
-                + " = " + tid + " AND column_name = '" + eachColumn.getColumnName() + "'";
-
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(sql);
-            }
-            res = stmt.executeQuery(sql);
-            if (!res.next()) {
-              throw new IOException("ERROR: there is no columnId matched to "
-                  + table.getName());
-            }
-            columnId = res.getInt("column_id");
-
-            if (columns.length() > 0) {
-              columns.append(",");
-            }
-            columns.append(columnId);
-          }
-
-          // Set default partition name. But if user named to subpartition, it would be updated.
-//          String partitionName = partitions.getPartitionsType().name() + "_" + table.getName();
-
-          sql = "INSERT INTO " + TB_PARTTIONS + " (name, TID, "
-              + " type, quantity, columns, expressions) VALUES (?, ?, ?, ?, ?, ?) ";
-          pstmt = getConnection().prepareStatement(sql);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(sql);
-          }
+        pstmt.executeBatch();
+      }
 
-          // Find information for subpartitions
-          if (partitionDesc.getSpecifiers() != null) {
-            int count = 1;
-            if (partitionDesc.getSpecifiers().size() == 0) {
-              pstmt.clearParameters();
-              pstmt.setString(1, null);
-              pstmt.setInt(2, tid);
-              pstmt.setString(3, partitionDesc.getPartitionsType().name());
-              pstmt.setInt(4, partitionDesc.getNumPartitions());
-              pstmt.setString(5, columns.toString());
-              pstmt.setString(6, null);
-              pstmt.addBatch();
-            } else {
-              for(Specifier specifier: partitionDesc.getSpecifiers()) {
-                pstmt.clearParameters();
-                if (specifier.getName() != null && !specifier.getName().equals("")) {
-                  pstmt.setString(1, specifier.getName());
-                } else {
-                  pstmt.setString(1, null);
-                }
-                pstmt.setInt(2, tid);
-                pstmt.setString(3, partitionDesc.getPartitionsType().name());
-                pstmt.setInt(4, partitionDesc.getNumPartitions());
-                pstmt.setString(5, columns.toString());
-                pstmt.setString(6, specifier.getExpressions());
-                pstmt.addBatch();
-                count++;
-              }
-            }
-          } else {
-            pstmt.clearParameters();
-            pstmt.setString(1, null);
-            pstmt.setInt(2, tid);
-            pstmt.setString(3, partitionDesc.getPartitionsType().name());
-            pstmt.setInt(4, partitionDesc.getNumPartitions());
-            pstmt.setString(5, columns.toString());
-            pstmt.setString(6, null);
-            pstmt.addBatch();
-          }
-          pstmt.executeBatch();
-        } finally {
-          CatalogUtil.closeSQLWrapper(pstmt);
-        }
+      if(table.hasPartition()) {
+        String partSql =
+            String.format("INSERT INTO %s (%s, partition_type, expression, expression_schema) VALUES(?, ?, ?, ?)",
+                TB_PARTITION_METHODS, C_TABLE_ID);
+        pstmt = conn.prepareStatement(partSql);
+        pstmt.setString(1, tableName);
+        pstmt.setString(2, table.getPartition().getPartitionType().name());
+        pstmt.setString(3, table.getPartition().getExpression());
+        pstmt.setBytes(4, table.getPartition().getExpressionSchema().toByteArray());
+        pstmt.executeUpdate();
       }
 
+      conn.commit();
     } catch (SQLException se) {
+      try {
+        conn.rollback();
+      } catch (SQLException rbe) {
+        throw new IOException(se.getMessage(), rbe);
+      }
       throw new IOException(se.getMessage(), se);
     } finally {
       CatalogUtil.closeSQLWrapper(pstmt);
-      CatalogUtil.closeSQLWrapper(stmt);
     }
   }
 
-  private String columnToSQL(final int tid, final TableDesc desc,
-                             final int columnId, final Column col) {
-    String sql =
-        "INSERT INTO " + TB_COLUMNS
-            + " (tid, " + C_TABLE_ID + ", column_id, column_name, data_type, type_length) "
-            + "VALUES("
-            + tid + ","
-            + "'" + desc.getName() + "',"
-            + columnId + ", "
-            + "'" + col.getColumnName() + "',"
-            + "'" + col.getDataType().getType().name() + "',"
-            + (col.getDataType().hasLength() ? col.getDataType().getLength() : 0)
-            + ")";
-
-    return sql;
-  }
-
   @Override
   public boolean existTable(final String name) throws IOException {
     StringBuilder sql = new StringBuilder();
@@ -423,6 +363,14 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     String sql = null;
 
     try {
+      getConnection().setAutoCommit(false);
+
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+    try {
       stmt = getConnection().createStatement();
       sql = "DELETE FROM " + TB_COLUMNS +
           " WHERE " + C_TABLE_ID + " = '" + name + "'";
@@ -460,9 +408,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
 
 
     try {
-      sql = "DELETE FROM " + TB_PARTTIONS + " WHERE TID IN ("
-        + " SELECT TID FROM " + TB_TABLES
-        + " WHERE " + C_TABLE_ID + " = '" + name + "' )";
+      sql = "DELETE FROM " + TB_PARTTIONS +
+          " WHERE " + C_TABLE_ID + " = '" + name + "'";
       LOG.info(sql);
       stmt = getConnection().createStatement();
       stmt.execute(sql);
@@ -487,21 +434,15 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
   }
 
   @Override
-  public TableDesc getTable(final String name) throws IOException {
+  public CatalogProtos.TableDescProto getTable(final String name) throws IOException {
     ResultSet res = null;
     Statement stmt = null;
 
-    String tableName = null;
-    Path path = null;
+    CatalogProtos.TableDescProto.Builder tableBuilder = CatalogProtos.TableDescProto.newBuilder();
     StoreType storeType = null;
-    Options options;
-    TableStats stat = null;
-    PartitionDesc partitionDesc = null;
-    int tid = 0;
-
     try {
       String sql =
-          "SELECT " + C_TABLE_ID + ", path, store_type, TID from " + TB_TABLES
+          "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
               + " WHERE " + C_TABLE_ID + "='" + name + "'";
       if (LOG.isDebugEnabled()) {
         LOG.debug(sql);
@@ -511,17 +452,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       if (!res.next()) { // there is no table of the given name.
         return null;
       }
-      tableName = res.getString(C_TABLE_ID).trim();
-      path = new Path(res.getString("path").trim());
+      tableBuilder.setId(res.getString(C_TABLE_ID).trim());
+      tableBuilder.setPath(res.getString("path").trim());
+
       storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
-      tid = res.getInt("TID");
+
     } catch (SQLException se) {
       throw new IOException(se);
     } finally {
       CatalogUtil.closeSQLWrapper(res, stmt);
     }
 
-    Schema schema = null;
+    CatalogProtos.SchemaProto.Builder schemaBuilder = CatalogProtos.SchemaProto.newBuilder();
     try {
       String sql = "SELECT column_name, data_type, type_length from " + TB_COLUMNS
           + " WHERE " + C_TABLE_ID + "='" + name + "' ORDER by column_id asc";
@@ -531,27 +473,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
         LOG.debug(sql);
       }
       res = stmt.executeQuery(sql);
-
-      schema = new Schema();
       while (res.next()) {
-        String columnName = tableName + "."
-            + res.getString("column_name").trim();
-        Type dataType = getDataType(res.getString("data_type")
-            .trim());
-        int typeLength = res.getInt("type_length");
-        if (typeLength > 0) {
-          schema.addColumn(columnName, dataType, typeLength);
-        } else {
-          schema.addColumn(columnName, dataType);
-        }
+        schemaBuilder.addFields(resultToColumnProto(res));
       }
     } catch (SQLException se) {
       throw new IOException(se);
     } finally {
       CatalogUtil.closeSQLWrapper(res, stmt);
     }
+    tableBuilder.setSchema(CatalogUtil.getQualfiedSchema(name, schemaBuilder.build()));
 
-    options = Options.create();
+    CatalogProtos.TableProto.Builder metaBuilder = CatalogProtos.TableProto.newBuilder();
+    metaBuilder.setStoreType(storeType);
     try {
       String sql = "SELECT key_, value_ from " + TB_OPTIONS
           + " WHERE " + C_TABLE_ID + "='" + name + "'";
@@ -560,17 +493,13 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
         LOG.debug(sql);
       }
       res = stmt.executeQuery(sql);
-
-      while (res.next()) {
-        options.put(
-            res.getString("key_"),
-            res.getString("value_"));
-      }
+      metaBuilder.setParams(resultToKeyValueSetProto(res));
     } catch (SQLException se) {
       throw new IOException(se);
     } finally {
       CatalogUtil.closeSQLWrapper(res, stmt);
     }
+    tableBuilder.setMeta(metaBuilder);
 
     try {
       String sql = "SELECT num_rows, num_bytes from " + TB_STATISTICS
@@ -582,9 +511,10 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       res = stmt.executeQuery(sql);
 
       if (res.next()) {
-        stat = new TableStats();
-        stat.setNumRows(res.getLong("num_rows"));
-        stat.setNumBytes(res.getLong("num_bytes"));
+        TableStatsProto.Builder statBuilder = TableStatsProto.newBuilder();
+        statBuilder.setNumRows(res.getLong("num_rows"));
+        statBuilder.setNumBytes(res.getLong("num_bytes"));
+        tableBuilder.setStats(statBuilder);
       }
     } catch (SQLException se) {
       throw new IOException(se);
@@ -593,53 +523,30 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     }
 
     try {
-      String sql = "SELECT name, type, quantity, columns, expressions from " + TB_PARTTIONS
-          + " WHERE TID =" + tid + "";
-      stmt = getConnection().createStatement();
+      String sql =
+          "SELECT partition_type, expression, expression_schema from " + TB_PARTITION_METHODS
+              + " WHERE " + C_TABLE_ID + "='" + name + "'";
       if (LOG.isDebugEnabled()) {
         LOG.debug(sql);
       }
+      stmt = getConnection().createStatement();
       res = stmt.executeQuery(sql);
 
-      while (res.next()) {
-        if (partitionDesc == null) {
-          partitionDesc = new PartitionDesc();
-          String[] columns = res.getString("columns").split(",");
-          for(String eachColumn: columns) {
-            partitionDesc.addColumn(getColumn(tableName, tid, eachColumn));
-          }
-          partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString("type")));
-          partitionDesc.setNumPartitions(res.getInt("quantity"));
-        }
-
-        Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
-        partitionDesc.addSpecifier(specifier);
+      if (res.next()) {
+        tableBuilder.setPartition(resultToPartitionMethodProto(name, res));
       }
-
     } catch (SQLException se) {
       throw new IOException(se);
     } finally {
       CatalogUtil.closeSQLWrapper(res, stmt);
     }
 
-    TableMeta meta = new TableMeta(storeType, options);
-    TableDesc table = new TableDesc(tableName, schema, meta, path);
-    if (stat != null) {
-      table.setStats(stat);
-    }
-
-    if (partitionDesc != null) {
-      table.setPartitions(partitionDesc);
-    }
-
-    return table;
+    return tableBuilder.build();
   }
 
-  private Column getColumn(String tableName, int tid, String columnId) throws IOException {
+  private ColumnProto getColumn(String tableName, int tid, String columnId) throws IOException {
     ResultSet res = null;
-    Column column = null;
     Statement stmt = null;
-
     try {
       String sql = "SELECT column_name, data_type, type_length from "
           + TB_COLUMNS + " WHERE TID = " + tid + " AND column_id = " + columnId;
@@ -651,23 +558,14 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       res = stmt.executeQuery(sql);
 
       if (res.next()) {
-        String columnName = tableName + "."
-            + res.getString("column_name").trim();
-        Type dataType = getDataType(res.getString("data_type")
-            .trim());
-        int typeLength = res.getInt("type_length");
-        if (typeLength > 0) {
-          column = new Column(columnName, dataType, typeLength);
-        } else {
-          column = new Column(columnName, dataType);
-        }
+        return resultToColumnProto(res);
       }
     } catch (SQLException se) {
       throw new IOException(se);
     } finally {
       CatalogUtil.closeSQLWrapper(res, stmt);
     }
-    return column;
+    return null;
   }
 
   private Type getDataType(final String typeStr) {
@@ -706,6 +604,266 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
   }
 
   @Override
+  public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws  IOException {
+    PreparedStatement pstmt = null;
+    String sql = "INSERT INTO " + TB_PARTTIONS + " (" + C_TABLE_ID
+        + ",  partition_name, ordinal_position, path, cache_nodes) "
+        + "VALUES (?, ?, ?, ?, ?, ?) ";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+    try {
+      pstmt = getConnection().prepareStatement(sql);
+      for (CatalogProtos.PartitionDescProto proto : partitionsProto.getPartitionList()) {
+        pstmt.setString(1, proto.getTableId());
+        pstmt.setString(2, proto.getPartitionName());
+        pstmt.setInt(3, proto.getOrdinalPosition());
+        pstmt.setString(4, proto.getPartitionValue());
+        pstmt.setString(5, proto.getPath());
+        pstmt.addBatch();
+      }
+      pstmt.executeBatch();
+    } catch (SQLException se) {
+      throw new IOException(se.getMessage(), se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(pstmt);
+    }
+  }
+
+  @Override
+  public void addPartitionMethod(CatalogProtos.PartitionMethodProto proto) throws IOException {
+    PreparedStatement pstmt = null;
+    String sql = "INSERT INTO " + TB_PARTITION_METHODS + " (" + C_TABLE_ID
+        + ", partition_type,  expression, expression_schema) "
+        + "VALUES (?, ?, ?, ?) ";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+
+    try {
+      pstmt = getConnection().prepareStatement(sql);
+      pstmt.setString(1, proto.getTableId().toLowerCase());
+      pstmt.setString(2, proto.getPartitionType().name());
+      pstmt.setString(3, proto.getExpression());
+      pstmt.setBytes(4, proto.getExpressionSchema().toByteArray());
+      pstmt.executeUpdate();
+    } catch (SQLException se) {
+      throw new IOException(se.getMessage(), se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(pstmt);
+    }
+  }
+
+  @Override
+  public void delPartitionMethod(String tableName) throws IOException {
+    String sql =
+        "DELETE FROM " + TB_PARTITION_METHODS
+            + " WHERE " + C_TABLE_ID + " ='" + tableName + "'";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+    Statement stmt = null;
+
+    try {
+      stmt = getConnection().createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt.executeUpdate(sql);
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+
+  @Override
+  public CatalogProtos.PartitionMethodProto getPartitionMethod(String tableName) throws IOException {
+    ResultSet res = null;
+    Statement stmt = null;
+
+    try {
+      String sql =
+          "SELECT partition_type, expression, expression_schema from " + TB_PARTITION_METHODS
+              + " WHERE " + C_TABLE_ID + "='" + tableName + "'";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt = getConnection().createStatement();
+      res = stmt.executeQuery(sql);
+
+      if (res.next()) {
+        return resultToPartitionMethodProto(tableName, res);
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    return null;
+  }
+
+  @Override
+  public boolean existPartitionMethod(String tableName) throws IOException {
+    ResultSet res = null;
+    Statement stmt = null;
+    boolean exist = false;
+    try {
+      String sql =
+          "SELECT partition_type, expression, expression_schema from " + TB_PARTITION_METHODS
+              + " WHERE " + C_TABLE_ID + "='" + tableName + "'";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt = getConnection().createStatement();
+      res = stmt.executeQuery(sql);
+      exist = res.next();
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    return exist;
+  }
+
+  @Override
+  public void addPartition(CatalogProtos.PartitionDescProto proto) throws IOException {
+    PreparedStatement pstmt = null;
+    String sql = "INSERT INTO " + TB_PARTTIONS + " (" + C_TABLE_ID
+        + ", partition_method_id,  partition_name, ordinal_position, path, cache_nodes) "
+        + "VALUES (?, ?, ?, ?, ?, ?, ?) ";
+     if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+
+    try {
+      pstmt = getConnection().prepareStatement(sql);
+      pstmt.setString(1, proto.getTableId().toLowerCase());
+      pstmt.setString(2, proto.getPartitionName().toLowerCase());
+      pstmt.setInt(3, proto.getOrdinalPosition());
+      pstmt.setString(4, proto.getPartitionValue());
+      pstmt.setString(5, proto.getPath());
+      pstmt.executeUpdate();
+    } catch (SQLException se) {
+      throw new IOException(se.getMessage(), se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(pstmt);
+    }
+  }
+
+  @Override
+  public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws IOException {
+    ResultSet res = null;
+    PreparedStatement stmt = null;
+    CatalogProtos.PartitionDescProto proto = null;
+
+    String sql = "SELECT " + C_TABLE_ID
+        + ", partition_name, ordinal_position, partition_value, path, cache_nodes FROM "
+        + TB_PARTTIONS + "where partition_name = ?";
+
+    try {
+      stmt = getConnection().prepareStatement(sql);
+      stmt.setString(1, partitionName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stmt.toString());
+      }
+      res = stmt.executeQuery();
+      if (!res.next()) {
+        throw new IOException("ERROR: there is no index matched to " + partitionName);
+      }
+
+      proto = resultToPartitionDescProto(res);
+    } catch (SQLException se) {
+      throw new IOException(se.getMessage(), se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    return proto;
+  }
+
+
+  @Override
+  public CatalogProtos.PartitionsProto getPartitions(String tableName) throws IOException {
+    ResultSet res = null;
+    PreparedStatement stmt = null;
+    CatalogProtos.PartitionsProto proto = null;
+
+    String partsSql = "SELECT " + C_TABLE_ID
+        + ", partition_name, ordinal_position, partition_value, path, cache_nodes FROM "
+        + TB_PARTTIONS + "where " + C_TABLE_ID + " = ?";
+
+    try {
+      // PARTITIONS
+      stmt = getConnection().prepareStatement(partsSql);
+      stmt.setString(1, tableName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stmt.toString());
+      }
+      res = stmt.executeQuery();
+      CatalogProtos.PartitionsProto.Builder builder = CatalogProtos.PartitionsProto.newBuilder();
+      while(res.next()) {
+        builder.addPartition(resultToPartitionDescProto(res));
+      }
+      proto = builder.build();
+    } catch (SQLException se) {
+      throw new IOException(se.getMessage(), se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
+    return proto;
+  }
+
+
+  @Override
+  public void delPartition(String partitionName) throws IOException {
+    String sql =
+        "DELETE FROM " + TB_PARTTIONS
+            + " WHERE partition_name ='" + partitionName + "'";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+    Statement stmt = null;
+
+    try {
+      stmt = getConnection().createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt.executeUpdate(sql);
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+
+  @Override
+  public void delPartitions(String tableName) throws IOException {
+    String sql =
+        "DELETE FROM " + TB_PARTTIONS
+            + " WHERE " + C_TABLE_ID + " ='" + tableName + "'";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+    Statement stmt = null;
+
+    try {
+      stmt = getConnection().createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt.executeUpdate(sql);
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+  }
+
+
+  @Override
   public void addIndex(final IndexDescProto proto) throws IOException {
     String sql =
         "INSERT INTO indexes (index_name, " + C_TABLE_ID + ", column_name, "
@@ -781,7 +939,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       if (!res.next()) {
         throw new IOException("ERROR: there is no index matched to " + indexName);
       }
-      proto = resultToProto(res);
+      proto = resultToIndexDescProto(res);
     } catch (SQLException se) {
     } finally {
       CatalogUtil.closeSQLWrapper(res, stmt);
@@ -814,7 +972,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
         throw new IOException("ERROR: there is no index matched to "
             + tableName + "." + columnName);
       }
-      proto = resultToProto(res);
+      proto = resultToIndexDescProto(res);
     } catch (SQLException se) {
       new IOException(se);
     } finally {
@@ -903,7 +1061,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       }
       res = stmt.executeQuery();
       while (res.next()) {
-        protos.add(resultToProto(res));
+        protos.add(resultToIndexDescProto(res));
       }
     } catch (SQLException se) {
     } finally {
@@ -913,11 +1071,11 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     return protos.toArray(new IndexDescProto[protos.size()]);
   }
 
-  private IndexDescProto resultToProto(final ResultSet res) throws SQLException {
+  private IndexDescProto resultToIndexDescProto(final ResultSet res) throws SQLException {
     IndexDescProto.Builder builder = IndexDescProto.newBuilder();
     builder.setName(res.getString("index_name"));
     builder.setTableId(res.getString(C_TABLE_ID));
-    builder.setColumn(resultToColumnProto(res));
+    builder.setColumn(indexResultToColumnProto(res));
     builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
     builder.setIsUnique(res.getBoolean("is_unique"));
     builder.setIsClustered(res.getBoolean("is_clustered"));
@@ -925,10 +1083,60 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     return builder.build();
   }
 
+  /**
+   * INDEXS table doesn't store type_length, so we need another resultToColumnProto method
+   */
+  private ColumnProto indexResultToColumnProto(final ResultSet res) throws SQLException {
+    ColumnProto.Builder builder = ColumnProto.newBuilder();
+    builder.setColumnName(res.getString("column_name").trim());
+
+    Type type = getDataType(res.getString("data_type").trim());
+    builder.setDataType(CatalogUtil.newSimpleDataType(type));
+
+    return builder.build();
+  }
+
   private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
     ColumnProto.Builder builder = ColumnProto.newBuilder();
-    builder.setColumnName(res.getString("column_name"));
-    builder.setDataType(CatalogUtil.newSimpleDataType(getDataType(res.getString("data_type").trim())));
+    builder.setColumnName(res.getString("column_name").trim());
+
+    Type type = getDataType(res.getString("data_type").trim());
+    int typeLength = res.getInt("type_length");
+    if(typeLength > 0 ) {
+      builder.setDataType(CatalogUtil.newDataTypeWithLen(type, typeLength));
+    } else {
+      builder.setDataType(CatalogUtil.newSimpleDataType(type));
+    }
+
+    return builder.build();
+  }
+
+  private CatalogProtos.KeyValueSetProto resultToKeyValueSetProto(final ResultSet res) throws SQLException {
+    CatalogProtos.KeyValueSetProto.Builder setBuilder = CatalogProtos.KeyValueSetProto.newBuilder();
+    CatalogProtos.KeyValueProto.Builder builder = CatalogProtos.KeyValueProto.newBuilder();
+    while (res.next()) {
+      builder.setKey(res.getString("key_"));
+      builder.setValue(res.getString("value_"));
+      setBuilder.addKeyval(builder.build());
+    }
+    return setBuilder.build();
+  }
+
+  private ColumnProto resultToQualifiedColumnProto(String tableName, final ResultSet res) throws SQLException {
+    ColumnProto.Builder builder = ColumnProto.newBuilder();
+
+    String columnName = tableName + "."
+        + res.getString("column_name").trim();
+    builder.setColumnName(columnName);
+
+    Type type = getDataType(res.getString("data_type").trim());
+    int typeLength = res.getInt("type_length");
+    if(typeLength > 0 ) {
+      builder.setDataType(CatalogUtil.newDataTypeWithLen(type, typeLength));
+    } else {
+      builder.setDataType(CatalogUtil.newSimpleDataType(type));
+    }
+
     return builder.build();
   }
 
@@ -943,9 +1151,52 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     }
   }
 
+  private CatalogProtos.PartitionMethodProto resultToPartitionMethodProto(final String tableName, final ResultSet res)
+      throws SQLException, InvalidProtocolBufferException {
+    CatalogProtos.PartitionMethodProto.Builder partBuilder = CatalogProtos.PartitionMethodProto.newBuilder();
+    partBuilder.setTableId(tableName);
+    partBuilder.setPartitionType(CatalogProtos.PartitionType.valueOf(res.getString("partition_type")));
+    partBuilder.setExpression(res.getString("expression"));
+    partBuilder.setExpressionSchema(SchemaProto.parseFrom(res.getBytes("expression_schema")));
+    return partBuilder.build();
+  }
+
+  private CatalogProtos.PartitionDescProto resultToPartitionDescProto(ResultSet res) throws SQLException {
+    CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
+    builder.setTableId(res.getString(1));
+    builder.setPartitionName(res.getString(2));
+    builder.setOrdinalPosition(res.getInt(3));
+    builder.setPartitionValue(res.getString(4));
+    builder.setPath(res.getString(5));
+    return builder.build();
+  }
+
   @Override
   public void close() {
     CatalogUtil.closeSQLWrapper(conn);
     LOG.info("Shutdown database (" + catalogUri + ")");
   }
+
+  @Override
+  public final void addFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet
+  }
+
+  @Override
+  public final void deleteFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet
+  }
+
+  @Override
+  public final void existFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet
+  }
+
+  @Override
+  public final List<String> getAllFunctionNames() throws IOException {
+    // TODO - not implemented yet
+    return null;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
index 44f4e7a..8a824bd 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.catalog.store;
 
 import org.apache.tajo.catalog.FunctionDesc;
-import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 
 import java.io.Closeable;
@@ -27,16 +27,48 @@ import java.io.IOException;
 import java.util.List;
 
 public interface CatalogStore extends Closeable {
-  void addTable(TableDesc desc) throws IOException;
+  /*************************** TABLE ******************************/
+  void addTable(CatalogProtos.TableDescProto desc) throws IOException;
   
   boolean existTable(String name) throws IOException;
   
   void deleteTable(String name) throws IOException;
   
-  TableDesc getTable(String name) throws IOException;
+  CatalogProtos.TableDescProto getTable(String name) throws IOException;
   
   List<String> getAllTableNames() throws IOException;
-  
+
+
+  /************************ PARTITION METHOD **************************/
+  void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws IOException;
+
+  CatalogProtos.PartitionMethodProto getPartitionMethod(String tableName) throws IOException;
+
+  boolean existPartitionMethod(String tableName) throws IOException;
+
+  void delPartitionMethod(String tableName) throws IOException;
+
+
+  /************************** PARTITIONS *****************************/
+  void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws IOException;
+
+  void addPartition(CatalogProtos.PartitionDescProto partitionDescProto) throws IOException;
+
+  /**
+   * Get all partitions of a table
+   * @param tableName the table name
+   * @return
+   * @throws IOException
+   */
+  CatalogProtos.PartitionsProto getPartitions(String tableName) throws IOException;
+
+  CatalogProtos.PartitionDescProto getPartition(String partitionName) throws IOException;
+
+  void delPartition(String partitionName) throws IOException;
+
+  void delPartitions(String tableName) throws IOException;
+
+  /**************************** INDEX *******************************/
   void addIndex(IndexDescProto proto) throws IOException;
   
   void delIndex(String indexName) throws IOException;
@@ -49,7 +81,8 @@ public interface CatalogStore extends Closeable {
   boolean existIndex(String indexName) throws IOException;
   
   boolean existIndex(String tableName, String columnName) throws IOException;
-  
+
+  /************************** FUNCTION *****************************/
   IndexDescProto [] getIndexes(String tableName) throws IOException;
   
   void addFunction(FunctionDesc func) throws IOException;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
index f21ab0e..6f1b612 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -22,34 +22,18 @@
 package org.apache.tajo.catalog.store;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.Specifier;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.exception.InternalException;
 
 import java.io.IOException;
 import java.sql.*;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
+
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class DerbyStore extends AbstractDBStore {
   private static final String CATALOG_DRIVER="org.apache.derby.jdbc.EmbeddedDriver";
-  protected Lock rlock;
-  protected Lock wlock;
 
   protected String getCatalogDriverName(){
     return CATALOG_DRIVER;
@@ -57,7 +41,6 @@ public class DerbyStore extends AbstractDBStore {
   
   public DerbyStore(final Configuration conf)
       throws InternalException {
-
     super(conf);
   }
 
@@ -66,8 +49,7 @@ public class DerbyStore extends AbstractDBStore {
   }
 
   // TODO - DDL and index statements should be renamed
-  protected void createBaseTable() throws SQLException {
-    wlock.lock();
+  protected void createBaseTable() throws IOException {
     Statement stmt = null;
     try {
       // META
@@ -220,43 +202,72 @@ public class DerbyStore extends AbstractDBStore {
           LOG.debug(idx_stats_fk_table_name);
         }
         stmt.addBatch(idx_stats_fk_table_name);
+        stmt.executeBatch();
         LOG.info("Table '" + TB_STATISTICS + "' is created.");
       }
 
-      // PARTITION
+      // PARTITION_METHODS
+      if (!baseTableMaps.get(TB_PARTITION_METHODS)) {
+        String partition_method_ddl = "CREATE TABLE " + TB_PARTITION_METHODS + " ("
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+            + "ON DELETE CASCADE, "
+            + "partition_type VARCHAR(10) NOT NULL,"
+            + "expression VARCHAR(1024) NOT NULL,"
+            + "expression_schema VARCHAR(1024) FOR BIT DATA NOT NULL)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(partition_method_ddl);
+        }
+        stmt.addBatch(partition_method_ddl);
+
+        String idx_partition_methods_table_name = "CREATE INDEX idx_partition_methods_table_name ON "
+            + TB_PARTITION_METHODS + " (" + C_TABLE_ID + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_partition_methods_table_name);
+        }
+        stmt.addBatch(idx_partition_methods_table_name);
+        stmt.executeBatch();
+        LOG.info("Table '" + TB_PARTITION_METHODS + "' is created.");
+      }
+
+      // PARTITIONS
       if (!baseTableMaps.get(TB_PARTTIONS)) {
         String partition_ddl = "CREATE TABLE " + TB_PARTTIONS + " ("
             + "PID INT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
-            + "name VARCHAR(255), "
-            + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
-            + "type VARCHAR(10) NOT NULL,"
-            + "quantity INT ,"
-            + "columns VARCHAR(255),"
-            + "expressions VARCHAR(1024)"
-            + ", CONSTRAINT PARTITION_PK PRIMARY KEY (PID)"
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+            + "ON DELETE CASCADE, "
+            + "partition_name VARCHAR(255), "
+            + "ordinal_position INT NOT NULL,"
+            + "partition_value VARCHAR(1024),"
+            + "path VARCHAR(1024),"
+            + "cache_nodes VARCHAR(255), "
+            + " CONSTRAINT PARTITION_PK PRIMARY KEY (PID)"
             + "   )";
-
         if (LOG.isDebugEnabled()) {
           LOG.debug(partition_ddl);
         }
         stmt.addBatch(partition_ddl);
-        LOG.info("Table '" + TB_PARTTIONS + "' is created.");
+
+        String idx_partitions_table_name = "CREATE INDEX idx_partitions_table_name ON "
+            + TB_PARTTIONS + " (" + C_TABLE_ID + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_partitions_table_name);
+        }
+        stmt.addBatch(idx_partitions_table_name);
         stmt.executeBatch();
+        LOG.info("Table '" + TB_PARTTIONS + "' is created.");
       }
 
+    } catch (SQLException se) {
+      throw new IOException(se);
     } finally {
-      wlock.unlock();
       CatalogUtil.closeSQLWrapper(stmt);
     }
   }
 
   @Override
-  protected boolean isInitialized() throws SQLException {
+  protected boolean isInitialized() throws IOException {
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-    this.rlock = lock.readLock();
-    this.wlock = lock.writeLock();
 
-    wlock.lock();
     ResultSet res = null;
     int foundCount = 0;
     try {
@@ -269,13 +280,15 @@ public class DerbyStore extends AbstractDBStore {
       baseTableMaps.put(TB_OPTIONS, false);
       baseTableMaps.put(TB_STATISTICS, false);
       baseTableMaps.put(TB_INDEXES, false);
+      baseTableMaps.put(TB_PARTITION_METHODS, false);
       baseTableMaps.put(TB_PARTTIONS, false);
 
       while (res.next()) {
         baseTableMaps.put(res.getString("TABLE_NAME"), true);
       }
+    } catch (SQLException se){
+      throw  new IOException(se);
     } finally {
-      wlock.unlock();
       CatalogUtil.closeSQLWrapper(res);
     }
 
@@ -288,8 +301,7 @@ public class DerbyStore extends AbstractDBStore {
     return true;
   }
   
-  final boolean checkInternalTable(final String tableName) throws SQLException {
-    rlock.lock();
+  final boolean checkInternalTable(final String tableName) throws IOException {
     ResultSet res = null;
     try {
       boolean found = false;
@@ -301,786 +313,13 @@ public class DerbyStore extends AbstractDBStore {
       }
       
       return found;
-    } finally {
-      rlock.unlock();
-      CatalogUtil.closeSQLWrapper(res);
-    }
-  }
-  
-  @Override
-  public final void addTable(final TableDesc table) throws IOException {
-    PreparedStatement pstmt = null;
-    Statement stmt = null;
-    ResultSet res = null;
-
-    String sql = 
-        "INSERT INTO " + TB_TABLES + " (" + C_TABLE_ID + ", path, store_type) "
-        + "VALUES('" + table.getName() + "', "
-        + "'" + table.getPath() + "', "
-        + "'" + table.getMeta().getStoreType() + "'"
-        + ")";
-
-    wlock.lock();
-    try {
-      stmt = getConnection().createStatement();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql);
-      }
-      stmt.executeUpdate(sql);
-
-      sql = "SELECT TID from " + TB_TABLES + " WHERE " + C_TABLE_ID 
-          + " = '" + table.getName() + "'";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql);
-      }
-      res = stmt.executeQuery(sql);
-      if (!res.next()) {
-        throw new IOException("ERROR: there is no tid matched to " 
-            + table.getName());
-      }
-      int tid = res.getInt("TID");
-
-      String colSql;
-      int columnId = 0;
-      for (Column col : table.getSchema().getColumns()) {
-        colSql = columnToSQL(tid, table, columnId, col);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(colSql);
-        }
-        stmt.addBatch(colSql);
-        columnId++;
-      }
-      
-      Iterator<Entry<String,String>> it = table.getMeta().toMap().entrySet().iterator();
-      String optSql;
-      while (it.hasNext()) {
-        optSql = keyvalToSQL(table, it.next());
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(optSql);
-        }
-        stmt.addBatch(optSql);
-      }
-      stmt.executeBatch();
-      if (table.getStats() != null) {
-        sql = "INSERT INTO " + TB_STATISTICS + " (" + C_TABLE_ID + ", num_rows, num_bytes) "
-            + "VALUES ('" + table.getName() + "', "
-            + table.getStats().getNumRows() + ","
-            + table.getStats().getNumBytes() + ")";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.executeUpdate(sql);
-      }
-
-      //Partition
-      if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
-        try {
-          PartitionDesc partitionDesc = table.getPartitions();
-          List<Column> columnList = partitionDesc.getColumns();
-
-          // Find columns which used for a partitioned table.
-          StringBuffer columns = new StringBuffer();
-          for(Column eachColumn : columnList) {
-            sql = "SELECT column_id from " + TB_COLUMNS + " WHERE TID "
-                + " = " + tid + " AND column_name = '" + eachColumn.getColumnName() + "'";
-
-            if (LOG.isDebugEnabled()) {
-              LOG.debug(sql);
-            }
-            res = stmt.executeQuery(sql);
-            if (!res.next()) {
-              throw new IOException("ERROR: there is no columnId matched to "
-                  + table.getName());
-            }
-            columnId = res.getInt("column_id");
-
-            if(columns.length() > 0) {
-              columns.append(",");
-            }
-            columns.append(columnId);
-          }
-
-          // Set default partition name. But if user named to subpartition, it would be updated.
-//          String partitionName = partitions.getPartitionsType().name() + "_" + table.getName();
-
-          sql = "INSERT INTO " + TB_PARTTIONS + " (name, TID, "
-              + " type, quantity, columns, expressions) VALUES (?, ?, ?, ?, ?, ?) ";
-          pstmt = getConnection().prepareStatement(sql);
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(sql);
-          }
-
-          // Find information for subpartitions
-          if (partitionDesc.getSpecifiers() != null) {
-            int count = 1;
-            if (partitionDesc.getSpecifiers().size() == 0) {
-              pstmt.clearParameters();
-              pstmt.setString(1, null);
-              pstmt.setInt(2, tid);
-              pstmt.setString(3, partitionDesc.getPartitionsType().name());
-              pstmt.setInt(4, partitionDesc.getNumPartitions());
-              pstmt.setString(5, columns.toString());
-              pstmt.setString(6, null);
-              pstmt.addBatch();
-            } else {
-              for(Specifier eachValue: partitionDesc.getSpecifiers()) {
-                pstmt.clearParameters();
-                if (eachValue.getName() != null && !eachValue.getName().equals("")) {
-                  pstmt.setString(1, eachValue.getName());
-                } else {
-                  pstmt.setString(1, null);
-                }
-                pstmt.setInt(2, tid);
-                pstmt.setString(3, partitionDesc.getPartitionsType().name());
-                pstmt.setInt(4, partitionDesc.getNumPartitions());
-                pstmt.setString(5, columns.toString());
-                pstmt.setString(6, eachValue.getExpressions());
-                pstmt.addBatch();
-                count++;
-              }
-            }
-          } else {
-            pstmt.clearParameters();
-            pstmt.setString(1, null);
-            pstmt.setInt(2, tid);
-            pstmt.setString(3, partitionDesc.getPartitionsType().name());
-            pstmt.setInt(4, partitionDesc.getNumPartitions());
-            pstmt.setString(5, columns.toString());
-            pstmt.setString(6, null);
-            pstmt.addBatch();
-          }
-          pstmt.executeBatch();
-        } finally {
-          CatalogUtil.closeSQLWrapper(pstmt);
-        }
-      }
-
-    } catch (SQLException se) {
-      throw new IOException(se.getMessage(), se);
-    } finally {
-      wlock.unlock();
-      CatalogUtil.closeSQLWrapper(res, pstmt);
-      CatalogUtil.closeSQLWrapper(res, stmt);
-    }
-  }
-  
-  private String columnToSQL(final int tid, final TableDesc desc, 
-      final int columnId, final Column col) {
-    String sql =
-        "INSERT INTO " + TB_COLUMNS 
-        + " (tid, " + C_TABLE_ID + ", column_id, column_name, data_type, type_length) "
-        + "VALUES("
-        + tid + ","
-        + "'" + desc.getName() + "',"
-        + columnId + ", "
-        + "'" + col.getColumnName() + "',"
-        + "'" + col.getDataType().getType().name() + "',"
-        + (col.getDataType().hasLength() ? col.getDataType().getLength() : 0)
-        + ")";
-    
-    return sql;
-  }
-  
-  private String keyvalToSQL(final TableDesc desc,
-      final Entry<String,String> keyVal) {
-    String sql =
-        "INSERT INTO " + TB_OPTIONS 
-        + " (" + C_TABLE_ID + ", key_, value_) "
-        + "VALUES("
-        + "'" + desc.getName() + "',"
-        + "'" + keyVal.getKey() + "',"
-        + "'" + keyVal.getValue() + "'"
-        + ")";
-    
-    return sql;
-  }
-  
-  @Override
-  public final boolean existTable(final String name) throws IOException {
-    StringBuilder sql = new StringBuilder();
-    sql.append("SELECT " + C_TABLE_ID + " from ")
-    .append(TB_TABLES)
-    .append(" WHERE " + C_TABLE_ID + " = '")
-    .append(name)
-    .append("'");
-
-    Statement stmt = null;
-    ResultSet res = null;
-    boolean exist = false;
-    rlock.lock();
-    try {
-      stmt = getConnection().createStatement();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql.toString());
-      }
-      res = stmt.executeQuery(sql.toString());
-      exist = res.next();
-    } catch (SQLException se) {
-      throw new IOException(se);
-    } finally {
-      rlock.unlock();
-      CatalogUtil.closeSQLWrapper(res, stmt);
-    }
-    
-    return exist;
-  }
-
-  @Override
-  public final void deleteTable(final String name) throws IOException {
-    Statement stmt = null;
-    String sql = null;
-    try {
-      wlock.lock();
-      stmt = getConnection().createStatement();
-      try {
-        sql = "DELETE FROM " + TB_COLUMNS +
-            " WHERE " + C_TABLE_ID + " = '" + name + "'";
-        LOG.info(sql);
-        stmt.execute(sql);
-      } catch (SQLException se) {
-        throw new IOException(se);
-      }
-
-      try {
-        sql = "DELETE FROM " + TB_OPTIONS +
-            " WHERE " + C_TABLE_ID + " = '" + name + "'";
-        LOG.info(sql);
-        stmt.execute(sql);
-      } catch (SQLException se) {
-        throw new IOException(se);
-      }
-
-      try {
-        sql = "DELETE FROM " + TB_STATISTICS +
-            " WHERE " + C_TABLE_ID + " = '" + name + "'";
-        LOG.info(sql);
-        stmt.execute(sql);
-      } catch (SQLException se) {
-        throw new IOException(se);
-      }
-
-      try {
-        sql = "DELETE FROM " + TB_PARTTIONS + " WHERE TID IN ("
-            + " SELECT TID FROM " + TB_TABLES
-            + " WHERE " + C_TABLE_ID + " = '" + name + "' )";
-        LOG.info(sql);
-        stmt = getConnection().createStatement();
-        stmt.execute(sql);
-      } catch (SQLException se) {
-        throw new IOException(se);
-      }
-
-      try {
-        sql = "DELETE FROM " + TB_TABLES +
-            " WHERE " + C_TABLE_ID +" = '" + name + "'";
-        LOG.info(sql);
-        stmt.execute(sql);
-      } catch (SQLException se) {
-        throw new IOException(se);
-      }
     } catch (SQLException se) {
       throw new IOException(se);
     } finally {
-      wlock.unlock();
-      CatalogUtil.closeSQLWrapper(stmt);
-    }
-  }
-
-  @Override
-  public final TableDesc getTable(final String name) throws IOException {
-    ResultSet res = null;
-    Statement stmt = null;
-
-    String tableName = null;
-    Path path = null;
-    StoreType storeType = null;
-    Options options;
-    TableStats stat = null;
-    PartitionDesc partitionDesc = null;
-    int tid = 0;
-
-    try {
-      rlock.lock();
-      stmt = getConnection().createStatement();
-
-      try {
-        String sql = 
-            "SELECT " + C_TABLE_ID + ", path, store_type, TID from " + TB_TABLES
-            + " WHERE " + C_TABLE_ID + "='" + name + "'";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        res = stmt.executeQuery(sql);
-        if (!res.next()) { // there is no table of the given name.
-          return null;
-        }
-        tableName = res.getString(C_TABLE_ID).trim();
-        path = new Path(res.getString("path").trim());
-        storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
-        tid = res.getInt("TID");
-      } catch (SQLException se) { 
-        throw new IOException(se);
-      } finally {
-        CatalogUtil.closeSQLWrapper(res);
-      }
-      
-      Schema schema = null;
-      try {
-        String sql = "SELECT column_name, data_type, type_length from " + TB_COLUMNS
-            + " WHERE " + C_TABLE_ID + "='" + name + "' ORDER by column_id asc";
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        res = stmt.executeQuery(sql);
-
-        schema = new Schema();
-        while (res.next()) {
-          String columnName = tableName + "." 
-              + res.getString("column_name").trim();
-          Type dataType = getDataType(res.getString("data_type")
-              .trim());
-          int typeLength = res.getInt("type_length");
-          if (typeLength > 0) {
-            schema.addColumn(columnName, dataType, typeLength);
-          } else {
-            schema.addColumn(columnName, dataType);
-          }
-        }
-      } catch (SQLException se) {
-        throw new IOException(se);
-      } finally {
-        CatalogUtil.closeSQLWrapper(res);
-      }
-      
-      options = Options.create();
-      try {
-        String sql = "SELECT key_, value_ from " + TB_OPTIONS
-            + " WHERE " + C_TABLE_ID + "='" + name + "'";
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        res = stmt.executeQuery(sql);        
-        
-        while (res.next()) {
-          options.put(
-              res.getString("key_"),
-              res.getString("value_"));          
-        }
-      } catch (SQLException se) {
-        throw new IOException(se);
-      } finally {
-        CatalogUtil.closeSQLWrapper(res);
-      }
-
-      try {
-        String sql = "SELECT num_rows, num_bytes from " + TB_STATISTICS
-            + " WHERE " + C_TABLE_ID + "='" + name + "'";
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-
-        res = stmt.executeQuery(sql);
-
-        if (res.next()) {
-          stat = new TableStats();
-          stat.setNumRows(res.getLong("num_rows"));
-          stat.setNumBytes(res.getLong("num_bytes"));
-        }
-      } catch (SQLException se) {
-        throw new IOException(se);
-      } finally {
-        CatalogUtil.closeSQLWrapper(res);
-      }
-
-
-      try {
-        String sql = "SELECT name, type, quantity, columns, expressions from " + TB_PARTTIONS
-            + " WHERE TID =" + tid + "";
-        stmt = getConnection().createStatement();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        res = stmt.executeQuery(sql);
-
-        while (res.next()) {
-          if (partitionDesc == null) {
-            partitionDesc = new PartitionDesc();
-            String[] columns = res.getString("columns").split(",");
-            for(String eachColumn: columns) {
-              partitionDesc.addColumn(getColumn(tableName, tid, eachColumn));
-            }
-            partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString
-                ("type")));
-            partitionDesc.setNumPartitions(res.getInt("quantity"));
-          }
-
-          Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
-          partitionDesc.addSpecifier(specifier);
-        }
-
-      } catch (SQLException se) {
-        throw new IOException(se);
-      } finally {
-        CatalogUtil.closeSQLWrapper(res, stmt);
-      }
-
-      TableMeta meta = new TableMeta(storeType, options);
-      TableDesc table = new TableDesc(tableName, schema, meta, path);
-      if (stat != null) {
-        table.setStats(stat);
-      }
-
-      if (partitionDesc != null) {
-        table.setPartitions(partitionDesc);
-      }
-
-      return table;
-    } catch (SQLException se) {
-      throw new IOException(se);
-    } finally {
-      rlock.unlock();
-      CatalogUtil.closeSQLWrapper(stmt);
-    }
-  }
-
-  private Column getColumn(String tableName, int tid, String columnId) throws IOException {
-    ResultSet res = null;
-    Column column = null;
-    Statement stmt = null;
-
-    try {
-      String sql = "SELECT column_name, data_type, type_length from "
-          + TB_COLUMNS + " WHERE TID = " + tid + " AND column_id = " + columnId;
-
-      stmt = getConnection().createStatement();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql);
-      }
-      res = stmt.executeQuery(sql);
-
-      if (res.next()) {
-        String columnName = tableName + "."
-            + res.getString("column_name").trim();
-        Type dataType = getDataType(res.getString("data_type")
-            .trim());
-        int typeLength = res.getInt("type_length");
-        if (typeLength > 0) {
-          column = new Column(columnName, dataType, typeLength);
-        } else {
-          column = new Column(columnName, dataType);
-        }
-      }
-    } catch (SQLException se) {
-      throw new IOException(se);
-    } finally {
-      CatalogUtil.closeSQLWrapper(res, stmt);
-    }
-    return column;
-  }
-
-  private Type getDataType(final String typeStr) {
-    try {
-    return Enum.valueOf(Type.class, typeStr);
-    } catch (IllegalArgumentException iae) {
-      LOG.error("Cannot find a matched type aginst from '" + typeStr + "'");
-      return null;
-    }
-  }
-  
-  @Override
-  public final List<String> getAllTableNames() throws IOException {
-    String sql = "SELECT " + C_TABLE_ID + " from " + TB_TABLES;
-    
-    Statement stmt = null;
-    ResultSet res = null;
-    
-    List<String> tables = new ArrayList<String>();
-    rlock.lock();
-    try {
-      stmt = getConnection().createStatement();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql);
-      }
-      res = stmt.executeQuery(sql);
-      while (res.next()) {
-        tables.add(res.getString(C_TABLE_ID).trim());
-      }
-    } catch (SQLException se) {
-      throw new IOException(se);
-    } finally {
-      rlock.unlock();
-      CatalogUtil.closeSQLWrapper(res, stmt);
-    }
-    return tables;
-  }
-  
-  public final void addIndex(final IndexDescProto proto) throws IOException {
-    String sql = 
-        "INSERT INTO indexes (index_name, " + C_TABLE_ID + ", column_name, " 
-        +"data_type, index_type, is_unique, is_clustered, is_ascending) VALUES "
-        +"(?,?,?,?,?,?,?,?)";
-    
-    PreparedStatement stmt = null;
-
-    wlock.lock();
-    try {
-      stmt = getConnection().prepareStatement(sql);
-      stmt.setString(1, proto.getName());
-      stmt.setString(2, proto.getTableId());
-      stmt.setString(3, proto.getColumn().getColumnName());
-      stmt.setString(4, proto.getColumn().getDataType().getType().name());
-      stmt.setString(5, proto.getIndexMethod().toString());
-      stmt.setBoolean(6, proto.hasIsUnique() && proto.getIsUnique());
-      stmt.setBoolean(7, proto.hasIsClustered() && proto.getIsClustered());
-      stmt.setBoolean(8, proto.hasIsAscending() && proto.getIsAscending());
-      stmt.executeUpdate();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(stmt.toString());
-      }
-    } catch (SQLException se) {
-      throw new IOException(se);
-    } finally {
-      wlock.unlock();
-      CatalogUtil.closeSQLWrapper(stmt);
-    }
-  }
-  
-  public final void delIndex(final String indexName) throws IOException {
-    String sql =
-        "DELETE FROM " + TB_INDEXES
-        + " WHERE index_name='" + indexName + "'";
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(sql);
-    }
-      
-      Statement stmt = null;
-      wlock.lock(); 
-      try {
-        stmt = getConnection().createStatement();
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(sql);
-        }
-        stmt.executeUpdate(sql);
-      } catch (SQLException se) {
-        throw new IOException(se);
-      } finally {
-        wlock.unlock();
-        CatalogUtil.closeSQLWrapper(stmt);
-      }
-  }
-  
-  public final IndexDescProto getIndex(final String indexName) 
-      throws IOException {
-    ResultSet res = null;
-    PreparedStatement stmt = null;
-    
-    IndexDescProto proto = null;
-    
-    rlock.lock();
-    try {
-      String sql = 
-          "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, " 
-          + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
-          + "where index_name = ?";
-      stmt = getConnection().prepareStatement(sql);
-      stmt.setString(1, indexName);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(stmt.toString());
-      }
-      res = stmt.executeQuery();
-      if (!res.next()) {
-        throw new IOException("ERROR: there is no index matched to " + indexName);
-      }
-      proto = resultToProto(res);
-    } catch (SQLException se) {
-    } finally {
-      rlock.unlock();
-      CatalogUtil.closeSQLWrapper(res, stmt);
-    }
-    
-    return proto;
-  }
-  
-  public final IndexDescProto getIndex(final String tableName, 
-      final String columnName) throws IOException {
-    ResultSet res = null;
-    PreparedStatement stmt = null;
-    
-    IndexDescProto proto = null;
-    
-    rlock.lock();
-    try {
-      String sql = 
-          "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, " 
-          + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
-          + "where " + C_TABLE_ID + " = ? AND column_name = ?";
-      stmt = getConnection().prepareStatement(sql);
-      stmt.setString(1, tableName);
-      stmt.setString(2, columnName);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql);
-      }
-      res = stmt.executeQuery();
-      if (!res.next()) {
-        throw new IOException("ERROR: there is no index matched to " 
-            + tableName + "." + columnName);
-      }      
-      proto = resultToProto(res);
-    } catch (SQLException se) {
-      new IOException(se);
-    } finally {
-      rlock.unlock();
-      CatalogUtil.closeSQLWrapper(res, stmt);
-    }
-    
-    return proto;
-  }
-  
-  public final boolean existIndex(final String indexName) throws IOException {
-    String sql = "SELECT index_name from " + TB_INDEXES 
-        + " WHERE index_name = ?";
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(sql);
-    }
-
-    PreparedStatement stmt = null;
-    ResultSet res = null;
-    boolean exist = false;
-    rlock.lock();
-    try {
-      stmt = getConnection().prepareStatement(sql);
-      stmt.setString(1, indexName);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql);
-      }
-      res = stmt.executeQuery();
-      exist = res.next();
-    } catch (SQLException se) {
-      
-    } finally {
-      rlock.unlock();
-      CatalogUtil.closeSQLWrapper(res, stmt);
-    }
-    
-    return exist;
-  }
-  
-  @Override
-  public boolean existIndex(String tableName, String columnName)
-      throws IOException {
-    String sql = "SELECT index_name from " + TB_INDEXES 
-        + " WHERE " + C_TABLE_ID + " = ? AND COLUMN_NAME = ?";
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(sql);
-    }
-
-    PreparedStatement stmt = null;
-    ResultSet res = null;
-    boolean exist = false;
-    rlock.lock();
-    try {
-      stmt = getConnection().prepareStatement(sql);
-      stmt.setString(1, tableName);
-      stmt.setString(2, columnName);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql);
-      }
-      res = stmt.executeQuery();
-      exist = res.next();
-    } catch (SQLException se) {
-      
-    } finally {
-      rlock.unlock();
-      CatalogUtil.closeSQLWrapper(res, stmt);
-    }
-    
-    return exist;
-  }
-  
-  public final IndexDescProto [] getIndexes(final String tableName) 
-      throws IOException {
-    ResultSet res = null;
-    PreparedStatement stmt = null;
-    
-    List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
-    
-    rlock.lock();
-    try {
-      String sql = "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, " 
-          + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
-          + "where " + C_TABLE_ID + "= ?";
-      stmt = getConnection().prepareStatement(sql);
-      stmt.setString(1, tableName);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(sql);
-      }
-      res = stmt.executeQuery();      
-      while (res.next()) {
-        protos.add(resultToProto(res));
-      }
-    } catch (SQLException se) {
-    } finally {
-      rlock.unlock();
-      CatalogUtil.closeSQLWrapper(res, stmt);
-    }
-    
-    return protos.toArray(new IndexDescProto [protos.size()]);
-  }
-  
-  private IndexDescProto resultToProto(final ResultSet res) throws SQLException {
-    IndexDescProto.Builder builder = IndexDescProto.newBuilder();
-    builder.setName(res.getString("index_name"));
-    builder.setTableId(res.getString(C_TABLE_ID));
-    builder.setColumn(resultToColumnProto(res));
-    builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
-    builder.setIsUnique(res.getBoolean("is_unique"));
-    builder.setIsClustered(res.getBoolean("is_clustered"));
-    builder.setIsAscending(res.getBoolean("is_ascending"));
-    return builder.build();
-  }
-  
-  private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
-    ColumnProto.Builder builder = ColumnProto.newBuilder();
-    builder.setColumnName(res.getString("column_name"));
-    builder.setDataType(CatalogUtil.newSimpleDataType(
-        getDataType(res.getString("data_type").trim())));
-    return builder.build();
-  }
-  
-  private IndexMethod getIndexMethod(final String typeStr) {
-    if (typeStr.equals(IndexMethod.TWO_LEVEL_BIN_TREE.toString())) {
-      return IndexMethod.TWO_LEVEL_BIN_TREE;
-    } else {
-      LOG.error("Cannot find a matched type aginst from '"
-          + typeStr + "'");
-      // TODO - needs exception handling
-      return null;
+      CatalogUtil.closeSQLWrapper(res);
     }
   }
   
-  @Override
-  public final void addFunction(final FunctionDesc func) throws IOException {
-    // TODO - not implemented yet    
-  }
-
-  @Override
-  public final void deleteFunction(final FunctionDesc func) throws IOException {
-    // TODO - not implemented yet    
-  }
-
-  @Override
-  public final void existFunction(final FunctionDesc func) throws IOException {
-    // TODO - not implemented yet    
-  }
-
-  @Override
-  public final List<String> getAllFunctionNames() throws IOException {
-    // TODO - not implemented yet
-    return null;
-  }
 
   @Override
   public final void close() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
index 77b258e..984ba6a 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -23,8 +23,9 @@ package org.apache.tajo.catalog.store;
 
 import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.FunctionDesc;
-import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 
 import java.io.IOException;
@@ -33,8 +34,8 @@ import java.util.List;
 import java.util.Map;
 
 public class MemStore implements CatalogStore {
-  private final Map<String,TableDesc> tables = Maps.newHashMap();
-  private final Map<String, FunctionDesc> functions = Maps.newHashMap();
+  private final Map<String,CatalogProtos.TableDescProto> tables = Maps.newHashMap();
+  private final Map<String, CatalogProtos.FunctionDescProto> functions = Maps.newHashMap();
   private final Map<String, IndexDescProto> indexes = Maps.newHashMap();
   private final Map<String, IndexDescProto> indexesByColumn = Maps.newHashMap();
   
@@ -55,9 +56,10 @@ public class MemStore implements CatalogStore {
    * @see CatalogStore#addTable(TableDesc)
    */
   @Override
-  public void addTable(TableDesc desc) throws IOException {
+  public void addTable(CatalogProtos.TableDescProto desc) throws IOException {
     synchronized(tables) {
-      tables.put(desc.getName(), desc);
+      String tableId = desc.getId().toLowerCase();
+      tables.put(tableId, desc);
     }
   }
 
@@ -67,7 +69,8 @@ public class MemStore implements CatalogStore {
   @Override
   public boolean existTable(String name) throws IOException {
     synchronized(tables) {
-      return tables.containsKey(name);
+      String tableId = name.toLowerCase();
+      return tables.containsKey(tableId);
     }
   }
 
@@ -77,7 +80,8 @@ public class MemStore implements CatalogStore {
   @Override
   public void deleteTable(String name) throws IOException {
     synchronized(tables) {
-      tables.remove(name);
+      String tableId = name.toLowerCase();
+      tables.remove(tableId);
     }
   }
 
@@ -85,8 +89,16 @@ public class MemStore implements CatalogStore {
    * @see CatalogStore#getTable(java.lang.String)
    */
   @Override
-  public TableDesc getTable(String name) throws IOException {
-    return tables.get(name);
+  public CatalogProtos.TableDescProto getTable(String name) throws IOException {
+    String tableId = name.toLowerCase();
+    CatalogProtos.TableDescProto unqualified = tables.get(tableId);
+    if(unqualified == null)
+      return null;
+    CatalogProtos.TableDescProto.Builder builder = CatalogProtos.TableDescProto.newBuilder();
+    CatalogProtos.SchemaProto schemaProto = CatalogUtil.getQualfiedSchema(tableId, unqualified.getSchema());
+    builder.mergeFrom(unqualified);
+    builder.setSchema(schemaProto);
+    return builder.build();
   }
 
   /* (non-Javadoc)
@@ -97,6 +109,60 @@ public class MemStore implements CatalogStore {
     return new ArrayList<String>(tables.keySet());
   }
 
+  @Override
+  public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws IOException {
+    throw new IOException("not supported!");
+  }
+
+  @Override
+  public CatalogProtos.PartitionMethodProto getPartitionMethod(String tableName) throws IOException {
+    String tableId = tableName.toLowerCase();
+    CatalogProtos.TableDescProto table = tables.get(tableId);
+    return (table != null && table.hasPartition()) ? table.getPartition() : null;
+  }
+
+  @Override
+  public boolean existPartitionMethod(String tableName) throws IOException {
+    String tableId = tableName.toLowerCase();
+    CatalogProtos.TableDescProto table = tables.get(tableId);
+    return (table != null && table.hasPartition());
+  }
+
+  @Override
+  public void delPartitionMethod(String tableName) throws IOException {
+    throw new IOException("not supported!");
+  }
+
+  @Override
+  public void addPartitions(CatalogProtos.PartitionsProto partitionDescList) throws IOException {
+    throw new IOException("not supported!");
+  }
+
+  @Override
+  public void addPartition(CatalogProtos.PartitionDescProto partitionDesc) throws IOException {
+    throw new IOException("not supported!");
+  }
+
+  @Override
+  public CatalogProtos.PartitionsProto getPartitions(String tableName) throws IOException {
+    throw new IOException("not supported!");
+  }
+
+  @Override
+  public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws IOException {
+    throw new IOException("not supported!");
+  }
+
+  @Override
+  public void delPartition(String partitionName) throws IOException {
+    throw new IOException("not supported!");
+  }
+
+  @Override
+  public void delPartitions(String tableName) throws IOException {
+    throw new IOException("not supported!");
+  }
+
   /* (non-Javadoc)
    * @see CatalogStore#addIndex(nta.catalog.proto.CatalogProtos.IndexDescProto)
    */

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
index c368969..e9c5a03 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
@@ -23,12 +23,10 @@ package org.apache.tajo.catalog.store;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.exception.InternalException;
 
 import java.io.IOException;
 import java.sql.*;
-import java.util.List;
 import java.util.Map;
 
 public class MySQLStore extends AbstractDBStore  {
@@ -50,13 +48,14 @@ public class MySQLStore extends AbstractDBStore  {
   }
 
   // TODO - DDL and index statements should be renamed
-  protected void createBaseTable() throws SQLException {
-
+  @Override
+  protected void createBaseTable() throws IOException {
     int result;
     Statement stmt = null;
-    try {
-      stmt = getConnection().createStatement();
+    Connection conn = getConnection();
 
+    try {
+      stmt = conn.createStatement();
       // META
       if (!baseTableMaps.get(TB_META)) {
         String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
@@ -88,12 +87,10 @@ public class MySQLStore extends AbstractDBStore  {
       if (!baseTableMaps.get(TB_COLUMNS)) {
         String columns_ddl =
             "CREATE TABLE " + TB_COLUMNS + " ("
-                + "TID INT NOT NULL,"
                 + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
                 + "column_id INT NOT NULL,"
                 + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
                 + "UNIQUE KEY(" + C_TABLE_ID + ", column_name),"
-                + "FOREIGN KEY(TID) REFERENCES "+TB_TABLES+"(TID) ON DELETE CASCADE,"
                 + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
         if (LOG.isDebugEnabled()) {
           LOG.debug(columns_ddl);
@@ -152,38 +149,61 @@ public class MySQLStore extends AbstractDBStore  {
         result = stmt.executeUpdate(stats_ddl);
       }
 
-      // PARTITION
+      // PARTITION_METHODS
+      if (!baseTableMaps.get(TB_PARTITION_METHODS)) {
+        String partition_method_ddl = "CREATE TABLE " + TB_PARTITION_METHODS + " ("
+            + C_TABLE_ID + " VARCHAR(255) PRIMARY KEY,"
+            + "partition_type VARCHAR(10) NOT NULL,"
+            + "expression TEXT NOT NULL,"
+            + "expression_schema VARBINARY(1024) NOT NULL, "
+            + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(partition_method_ddl);
+        }
+        LOG.info("Table '" + TB_PARTITION_METHODS + "' is created.");
+        result = stmt.executeUpdate(partition_method_ddl);
+      }
+
+      // PARTITIONS
       if (!baseTableMaps.get(TB_PARTTIONS)) {
         String partition_ddl = "CREATE TABLE " + TB_PARTTIONS + " ("
             + "PID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
-            + "name VARCHAR(255), "
-            + "TID INT NOT NULL,"
-            + "type VARCHAR(10) NOT NULL,"
-            + "quantity INT ,"
-            + "columns VARCHAR(255),"
-            + "expressions TEXT )";
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+            + "partition_name VARCHAR(255), "
+            + "ordinal_position INT NOT NULL,"
+            + "partition_value TEXT,"
+            + "path TEXT,"
+            + "cache_nodes VARCHAR(255), "
+            + "UNIQUE KEY(" + C_TABLE_ID + ", partition_name),"
+            + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
         if (LOG.isDebugEnabled()) {
           LOG.debug(partition_ddl);
         }
         LOG.info("Table '" + TB_PARTTIONS + "' is created.");
         result = stmt.executeUpdate(partition_ddl);
       }
+    } catch (SQLException se) {
+      throw new IOException(se);
     } finally {
       CatalogUtil.closeSQLWrapper(stmt);
     }
   }
 
-  protected boolean isInitialized() throws SQLException {
-    ResultSet res = getConnection().getMetaData().getTables(null, null, null,
-        new String[]{"TABLE"});
-
+  @Override
+  protected boolean isInitialized() throws IOException {
+    ResultSet res = null;
+    Connection conn = getConnection();
     try {
+      res = conn.getMetaData().getTables(null, null, null,
+          new String[]{"TABLE"});
+
       baseTableMaps.put(TB_META, false);
       baseTableMaps.put(TB_TABLES, false);
       baseTableMaps.put(TB_COLUMNS, false);
       baseTableMaps.put(TB_OPTIONS, false);
       baseTableMaps.put(TB_STATISTICS, false);
       baseTableMaps.put(TB_INDEXES, false);
+      baseTableMaps.put(TB_PARTITION_METHODS, false);
       baseTableMaps.put(TB_PARTTIONS, false);
 
       if (res.wasNull())
@@ -192,6 +212,8 @@ public class MySQLStore extends AbstractDBStore  {
       while (res.next()) {
         baseTableMaps.put(res.getString("TABLE_NAME"), true);
       }
+    } catch(SQLException se) {
+      throw new IOException(se);
     } finally {
       CatalogUtil.closeSQLWrapper(res);
     }
@@ -206,24 +228,4 @@ public class MySQLStore extends AbstractDBStore  {
 //    return false;
   }
 
-  @Override
-  public final void addFunction(final FunctionDesc func) throws IOException {
-    // TODO - not implemented yet    
-  }
-
-  @Override
-  public final void deleteFunction(final FunctionDesc func) throws IOException {
-    // TODO - not implemented yet    
-  }
-
-  @Override
-  public final void existFunction(final FunctionDesc func) throws IOException {
-    // TODO - not implemented yet    
-  }
-
-  @Override
-  public final List<String> getAllFunctionNames() throws IOException {
-    // TODO - not implemented yet
-    return null;
-  }
 }