You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2013/12/03 10:29:42 UTC

[2/2] git commit: TAJO-284: Add table partitioning entry to Catalog. (jaehwa)

TAJO-284: Add table partitioning entry to Catalog. (jaehwa)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/0b0de13b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/0b0de13b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/0b0de13b

Branch: refs/heads/master
Commit: 0b0de13b2444f9e75ad3e1f42cba51ddb1f86dc2
Parents: 29a0aa0
Author: blrunner <jh...@gruter.com>
Authored: Tue Dec 3 18:29:22 2013 +0900
Committer: blrunner <jh...@gruter.com>
Committed: Tue Dec 3 18:29:22 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/tajo/algebra/CreateTable.java    |  11 +-
 .../apache/tajo/catalog/CatalogConstants.java   |   1 +
 .../org/apache/tajo/catalog/CatalogUtil.java    |   3 +
 .../org/apache/tajo/catalog/DDLBuilder.java     |  70 +++
 .../java/org/apache/tajo/catalog/Schema.java    |   2 +-
 .../java/org/apache/tajo/catalog/TableDesc.java |  33 +-
 .../tajo/catalog/partition/Partitions.java      | 349 +++++++++++++
 .../tajo/catalog/partition/Specifier.java       | 128 +++++
 .../src/main/proto/CatalogProtos.proto          |  20 +
 tajo-catalog/tajo-catalog-server/pom.xml        |   4 +
 .../org/apache/tajo/catalog/CatalogServer.java  |   5 +-
 .../tajo/catalog/store/AbstractDBStore.java     | 184 ++++++-
 .../apache/tajo/catalog/store/DerbyStore.java   | 488 +++++++++++++------
 .../apache/tajo/catalog/store/MySQLStore.java   | 204 +++++---
 .../org/apache/tajo/catalog/TestCatalog.java    | 225 ++++++++-
 .../org/apache/tajo/catalog/TestDBStore.java    | 174 +++++++
 .../main/java/org/apache/tajo/cli/TajoCli.java  |  36 ++
 .../apache/tajo/engine/planner/LogicalPlan.java |  13 +-
 .../tajo/engine/planner/LogicalPlanner.java     | 135 ++++-
 .../engine/planner/logical/CreateTableNode.java |  18 +-
 .../engine/planner/logical/StoreTableNode.java  |  26 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   9 +-
 .../tajo/master/TajoMasterClientService.java    |   6 +-
 .../src/main/proto/ClientProtos.proto           |   1 +
 .../org/apache/tajo/client/TestTajoClient.java  | 122 +++++
 26 files changed, 2022 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ab0e893..8c53d9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-284: Add table partitioning entry to Catalog. (jaehwa)
+
     TAJO-317: Improve TajoResourceManager to support more elaborate resource 
     management. (Keuntae Park via jihoon)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
index 6e36f3a..41276ad 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
@@ -279,7 +279,6 @@ public class CreateTable extends Expr {
   }
 
   public static class RangePartitionSpecifier extends PartitionSpecifier {
-    String name;
     Expr end;
     boolean maxValue;
 
@@ -293,10 +292,6 @@ public class CreateTable extends Expr {
       maxValue = true;
     }
 
-    public String getName() {
-      return name;
-    }
-
     public Expr getEnd() {
       return end;
     }
@@ -320,10 +315,14 @@ public class CreateTable extends Expr {
   }
 
   public static class PartitionSpecifier {
-    String name;
+    private String name;
 
     public PartitionSpecifier(String name) {
       this.name = name;
     }
+
+    public String getName() {
+      return this.name;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index 4b1f794..ed23b08 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -35,6 +35,7 @@ public class CatalogConstants {
   public static final String TB_OPTIONS = "OPTIONS";
   public static final String TB_INDEXES = "INDEXES";
   public static final String TB_STATISTICS = "STATS";
+  public static final String TB_PARTTIONS = "PARTITIONS";
   public static final String C_TABLE_ID = "TABLE_ID";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 6b4848c..dc91035 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -27,6 +27,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.Statement;
+import java.sql.PreparedStatement;
 import java.sql.Wrapper;
 import java.util.Collection;
 
@@ -148,6 +149,8 @@ public class CatalogUtil {
       try{
         if(w instanceof Statement){
           ((Statement)w).close();
+        } else if(w instanceof PreparedStatement){
+            ((PreparedStatement)w).close();
         } else if(w instanceof ResultSet){
           ((ResultSet)w).close();
         } else if(w instanceof Connection){

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
index e6cc46d..a9d0f03 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
@@ -18,6 +18,9 @@
 
 package org.apache.tajo.catalog;
 
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
 
 import java.util.Map;
@@ -38,6 +41,10 @@ public class DDLBuilder {
     buildWithClause(sb, desc.getMeta());
     buildLocationClause(sb, desc);
 
+    if (desc.getPartitions() != null) {
+      buildPartitionClause(sb, desc);
+    }
+
     sb.append(";");
     return sb.toString();
   }
@@ -87,4 +94,67 @@ public class DDLBuilder {
   private static void buildLocationClause(StringBuilder sb, TableDesc desc) {
     sb.append(" LOCATION '").append(desc.getPath()).append("'");
   }
+
+  private static void buildPartitionClause(StringBuilder sb, TableDesc desc) {
+    Partitions partitions = desc.getPartitions();
+
+    sb.append(" PARTITION BY ");
+    sb.append(partitions.getPartitionsType().name());
+
+    // columns
+    sb.append("(");
+    int columnCount = 0;
+    for(Column column: partitions.getColumns()) {
+      for(Column targetColumn: desc.getSchema().getColumns()) {
+        if (column.getColumnName().equals(targetColumn.getColumnName()))  {
+          if (columnCount > 0)
+            sb.append(",");
+
+          sb.append(column.getColumnName());
+          columnCount++;
+        }
+      }
+    }
+    sb.append(")");
+
+    // specifier
+    if (partitions.getSpecifiers() != null
+        && !partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.COLUMN)) {
+
+      sb.append(" (");
+      for(int i = 0; i < partitions.getSpecifiers().size(); i++) {
+        Specifier specifier = partitions.getSpecifiers().get(i);
+        if (i > 0)
+          sb.append(",");
+
+        sb.append(" PARTITION");
+
+        if (!specifier.getName().isEmpty())
+          sb.append(" ").append(specifier.getName());
+
+        if (partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.LIST)) {
+          if (!specifier.getExpressions().isEmpty()) {
+            sb.append(" VALUES (");
+            String[] expressions = specifier.getExpressions().split("\\,");
+            for(int j = 0; j < expressions.length; j++) {
+              if (j > 0)
+                sb.append(",");
+              sb.append("'").append(expressions[j]).append("'");
+            }
+            sb.append(")");
+
+          }
+        } else if (partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.RANGE))  {
+          sb.append(" VALUES LESS THAN (");
+          if (!specifier.getExpressions().isEmpty()) {
+            sb.append(specifier.getExpressions());
+          } else {
+            sb.append("MAXVALUE");
+          }
+          sb.append(")");
+        }
+      }
+      sb.append(")");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 7c0de81..8a2d028 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -212,7 +212,7 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
     }
   }
 
-	@Override
+  @Override
 	public boolean equals(Object o) {
 		if (o instanceof Schema) {
 		  Schema other = (Schema) o;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index f59feef..458a99a 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -21,8 +21,11 @@ package org.apache.tajo.catalog;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.Expose;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.partition.Partitions;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -30,6 +33,8 @@ import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.json.GsonObject;
 
 public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Cloneable {
+  private final Log LOG = LogFactory.getLog(TableDesc.class);
+
   protected TableDescProto.Builder builder = null;
   
 	@Expose protected String tableName; // required
@@ -37,6 +42,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
   @Expose protected TableMeta meta; // required
   @Expose protected Path uri; // required
   @Expose	protected TableStats stats; // optional
+  @Expose protected Partitions partitions; //optional
   
 	public TableDesc() {
 		builder = TableDescProto.newBuilder();
@@ -48,7 +54,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
 	  this.tableName = tableName.toLowerCase();
     this.schema = schema;
 	  this.meta = info;
-	  this.uri = path;	   
+	  this.uri = path;
 	}
 	
 	public TableDesc(String tableName, Schema schema, StoreType type, Options options, Path path) {
@@ -58,6 +64,9 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
 	public TableDesc(TableDescProto proto) {
 	  this(proto.getId(), new Schema(proto.getSchema()), new TableMeta(proto.getMeta()), new Path(proto.getPath()));
     this.stats = new TableStats(proto.getStats());
+    if (proto.getPartitions() != null && !proto.getPartitions().toString().isEmpty()) {
+      this.partitions = new Partitions(proto.getPartitions());
+    }
 	}
 	
 	public void setName(String tableId) {
@@ -104,8 +113,20 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
   public TableStats getStats() {
     return this.stats;
   }
-	
-	public boolean equals(Object object) {
+
+  public boolean hasPartitions() {
+    return this.partitions != null;
+  }
+
+  public Partitions getPartitions() {
+    return partitions;
+  }
+
+  public void setPartitions(Partitions partitions) {
+    this.partitions = partitions;
+  }
+
+  public boolean equals(Object object) {
     if(object instanceof TableDesc) {
       TableDesc other = (TableDesc) object;
       
@@ -123,6 +144,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
     desc.meta = (TableMeta) meta.clone();
     desc.uri = uri;
     desc.stats = stats != null ? (TableStats) stats.clone() : null;
+    desc.partitions = partitions != null ? (Partitions) partitions.clone() : null;
 	  
 	  return desc;
 	}
@@ -154,7 +176,10 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
       builder.setPath(this.uri.toString());
     }
     if (this.stats != null) {
-      builder.setStats(stats.getProto());
+      builder.setStats(this.stats.getProto());
+    }
+    if (this.partitions != null) {
+      builder.setPartitions(this.partitions.getProto());
     }
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
new file mode 100644
index 0000000..c82f0cb
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.partition;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.exception.AlreadyExistsFieldException;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+public class Partitions implements ProtoObject<CatalogProtos.PartitionsProto>, Cloneable, GsonObject {
+
+  private static final Log LOG = LogFactory.getLog(Partitions.class);
+
+  @Expose protected CatalogProtos.PartitionsType partitionsType; //required
+  @Expose protected List<Column> columns; //required
+  @Expose protected int numPartitions; //optional
+  @Expose protected List<Specifier> specifiers; //optional
+  @Expose protected Map<String, Integer> columnsByQialifiedName = null;
+  @Expose protected Map<String, List<Integer>> columnsByName = null;
+
+  private CatalogProtos.PartitionsProto.Builder builder = CatalogProtos.PartitionsProto.newBuilder();
+
+  public Partitions() {
+    this.columns = new ArrayList<Column>();
+    this.columnsByQialifiedName = new TreeMap<String, Integer>();
+    this.columnsByName = new HashMap<String, List<Integer>>();
+  }
+
+  public Partitions(Partitions partition) {
+    this();
+    this.partitionsType = partition.partitionsType;
+    this.columns.addAll(partition.columns);
+    this.columnsByQialifiedName.putAll(partition.columnsByQialifiedName);
+    this.columnsByName.putAll(partition.columnsByName);
+    this.numPartitions = partition.numPartitions;
+    this.specifiers = partition.specifiers;
+  }
+
+  public Partitions(CatalogProtos.PartitionsType partitionsType, Column[] columns, int numPartitions,
+                   List<Specifier> specifiers) {
+    this();
+    this.partitionsType = partitionsType;
+    for (Column c : columns) {
+      addColumn(c);
+    }
+    this.numPartitions = numPartitions;
+    this.specifiers = specifiers;
+  }
+
+  public Partitions(CatalogProtos.PartitionsProto proto) {
+    this.partitionsType = proto.getPartitionsType();
+    this.columns = new ArrayList<Column>();
+    this.columnsByQialifiedName = new HashMap<String, Integer>();
+    this.columnsByName = new HashMap<String, List<Integer>>();
+    for (CatalogProtos.ColumnProto colProto : proto.getColumnsList()) {
+      Column tobeAdded = new Column(colProto);
+      columns.add(tobeAdded);
+      if (tobeAdded.hasQualifier()) {
+        columnsByQialifiedName.put(tobeAdded.getQualifier() + "." + tobeAdded.getColumnName(),
+            columns.size() - 1);
+      } else {
+        columnsByQialifiedName.put(tobeAdded.getColumnName(), columns.size() - 1);
+      }
+      if (columnsByName.containsKey(tobeAdded.getColumnName())) {
+        columnsByName.get(tobeAdded.getColumnName()).add(columns.size() - 1);
+      } else {
+        columnsByName.put(tobeAdded.getColumnName(), TUtil.newList(columns.size() - 1));
+      }
+    }
+    this.numPartitions = proto.getNumPartitions();
+    if(proto.getSpecifiersList() != null) {
+      this.specifiers = TUtil.newList();
+      for(CatalogProtos.SpecifierProto specifier: proto.getSpecifiersList()) {
+        this.specifiers.add(new Specifier(specifier));
+      }
+    }
+  }
+
+  /**
+   * Set a qualifier to this schema.
+   * This changes the qualifier of all columns except for not-qualified columns.
+   *
+   * @param qualifier The qualifier
+   */
+  public void setQualifier(String qualifier) {
+    setQualifier(qualifier, false);
+  }
+
+  /**
+   * Set a qualifier to this schema. This changes the qualifier of all columns if force is true.
+   * Otherwise, it changes the qualifier of all columns except for non-qualified columns
+   *
+   * @param qualifier The qualifier
+   * @param force     If true, all columns' qualifiers will be changed. Otherwise,
+   *                  only qualified columns' qualifiers will
+   *                  be changed.
+   */
+  public void setQualifier(String qualifier, boolean force) {
+    columnsByQialifiedName.clear();
+
+    for (int i = 0; i < getColumnNum(); i++) {
+      if (!force && columns.get(i).hasQualifier()) {
+        continue;
+      }
+      columns.get(i).setQualifier(qualifier);
+      columnsByQialifiedName.put(columns.get(i).getQualifiedName(), i);
+    }
+  }
+
+  public int getColumnNum() {
+    return this.columns.size();
+  }
+
+  public Column getColumn(int id) {
+    return columns.get(id);
+  }
+
+  public Column getColumnByFQN(String qualifiedName) {
+    Integer cid = columnsByQialifiedName.get(qualifiedName.toLowerCase());
+    return cid != null ? columns.get(cid) : null;
+  }
+
+  public Column getColumnByName(String colName) {
+    String normalized = colName.toLowerCase();
+    List<Integer> list = columnsByName.get(normalized);
+
+    if (list == null || list.size() == 0) {
+      return null;
+    }
+
+    if (list.size() == 1) {
+      return columns.get(list.get(0));
+    } else {
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+      for (Integer id : list) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append(columns.get(id));
+      }
+      throw new RuntimeException("Ambiguous Column Name: " + sb.toString());
+    }
+  }
+
+  public int getColumnId(String qualifiedName) {
+    return columnsByQialifiedName.get(qualifiedName.toLowerCase());
+  }
+
+  public int getColumnIdByName(String colName) {
+    for (Column col : columns) {
+      if (col.getColumnName().equals(colName.toLowerCase())) {
+        return columnsByQialifiedName.get(col.getQualifiedName());
+      }
+    }
+    return -1;
+  }
+
+  public List<Column> getColumns() {
+    return ImmutableList.copyOf(columns);
+  }
+
+  public void setColumns(List<Column> columns) {
+    this.columns = columns;
+  }
+
+  public boolean contains(String colName) {
+    return columnsByQialifiedName.containsKey(colName.toLowerCase());
+
+  }
+
+  public boolean containsAll(Collection<Column> columns) {
+    return columns.containsAll(columns);
+  }
+
+  public synchronized Partitions addColumn(String name, TajoDataTypes.Type type) {
+    if (type == TajoDataTypes.Type.CHAR) {
+      return addColumn(name, CatalogUtil.newDataTypeWithLen(type, 1));
+    }
+    return addColumn(name, CatalogUtil.newSimpleDataType(type));
+  }
+
+  public synchronized Partitions addColumn(String name, TajoDataTypes.Type type, int length) {
+    return addColumn(name, CatalogUtil.newDataTypeWithLen(type, length));
+  }
+
+  public synchronized Partitions addColumn(String name, TajoDataTypes.DataType dataType) {
+    String normalized = name.toLowerCase();
+    if (columnsByQialifiedName.containsKey(normalized)) {
+      LOG.error("Already exists column " + normalized);
+      throw new AlreadyExistsFieldException(normalized);
+    }
+
+    Column newCol = new Column(normalized, dataType);
+    columns.add(newCol);
+    columnsByQialifiedName.put(newCol.getQualifiedName(), columns.size() - 1);
+    columnsByName.put(newCol.getColumnName(), TUtil.newList(columns.size() - 1));
+
+    return this;
+  }
+
+  public synchronized void addColumn(Column column) {
+    addColumn(column.getQualifiedName(), column.getDataType());
+  }
+
+  public synchronized void addColumns(Partitions schema) {
+    for (Column column : schema.getColumns()) {
+      addColumn(column);
+    }
+  }
+
+  public synchronized void addSpecifier(Specifier specifier) {
+    if(specifiers == null)
+      specifiers = TUtil.newList();
+
+    specifiers.add(specifier);
+  }
+
+  public CatalogProtos.PartitionsType getPartitionsType() {
+    return partitionsType;
+  }
+
+  public void setPartitionsType(CatalogProtos.PartitionsType partitionsType) {
+    this.partitionsType = partitionsType;
+  }
+
+  public int getNumPartitions() {
+    return numPartitions;
+  }
+
+  public void setNumPartitions(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
+  public List<Specifier> getSpecifiers() {
+    return specifiers;
+  }
+
+  public void setSpecifiers(List<Specifier> specifiers) {
+    this.specifiers = specifiers;
+  }
+
+  public Map<String, Integer> getColumnsByQialifiedName() {
+    return columnsByQialifiedName;
+  }
+
+  public void setColumnsByQialifiedName(Map<String, Integer> columnsByQialifiedName) {
+    this.columnsByQialifiedName = columnsByQialifiedName;
+  }
+
+  public Map<String, List<Integer>> getColumnsByName() {
+    return columnsByName;
+  }
+
+  public void setColumnsByName(Map<String, List<Integer>> columnsByName) {
+    this.columnsByName = columnsByName;
+  }
+
+  public boolean equals(Object o) {
+    if (o instanceof Partitions) {
+      Partitions other = (Partitions) o;
+      return getProto().equals(other.getProto());
+    }
+    return false;
+  }
+
+  public Object clone() throws CloneNotSupportedException {
+    Partitions clone = (Partitions) super.clone();
+    clone.builder = CatalogProtos.PartitionsProto.newBuilder();
+    clone.setPartitionsType(this.partitionsType);
+    clone.setColumns(this.columns);
+    clone.setNumPartitions(this.numPartitions);
+    clone.specifiers = new ArrayList<Specifier>(this.specifiers);
+
+    return clone;
+  }
+
+  @Override
+  public CatalogProtos.PartitionsProto getProto() {
+    if (builder == null) {
+      builder = CatalogProtos.PartitionsProto.newBuilder();
+    }
+    if (this.partitionsType != null) {
+      builder.setPartitionsType(this.partitionsType);
+    }
+    builder.clearColumns();
+    if (this.columns != null) {
+      for (Column col : columns) {
+        builder.addColumns(col.getProto());
+      }
+    }
+    builder.setNumPartitions(numPartitions);
+
+    if (this.specifiers != null) {
+      for(Specifier specifier: specifiers) {
+        builder.addSpecifiers(specifier.getProto());
+      }
+    }
+    return builder.build();
+  }
+
+  public String toString() {
+    Gson gson = new GsonBuilder().setPrettyPrinting().
+        excludeFieldsWithoutExposeAnnotation().create();
+    return gson.toJson(this);
+  }
+
+  @Override
+  public String toJson() {
+    return CatalogGsonHelper.toJson(this, Partitions.class);
+
+  }
+
+  public Column[] toArray() {
+    return this.columns.toArray(new Column[this.columns.size()]);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java
new file mode 100644
index 0000000..feb8a33
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.partition;
+
+import com.google.common.base.Objects;
+import com.google.gson.Gson;
+import com.google.gson.annotations.Expose;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+public class Specifier implements ProtoObject<CatalogProtos.SpecifierProto>, Cloneable,
+    GsonObject {
+
+  private static final Log LOG = LogFactory.getLog(Specifier.class);
+  protected CatalogProtos.SpecifierProto.Builder builder = null;
+
+
+  @Expose protected String name;
+  @Expose protected String expressions;
+
+  public Specifier() {
+    builder = CatalogProtos.SpecifierProto.newBuilder();
+  }
+
+  public Specifier(String name) {
+    this();
+    this.name = name;
+  }
+
+  public Specifier(String name, String expressions) {
+    this();
+    this.name = name;
+    this.expressions = expressions;
+  }
+
+  public Specifier(CatalogProtos.SpecifierProto proto) {
+    this();
+    this.name = proto.getName().toLowerCase();
+    this.expressions = proto.getExpressions();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getExpressions() {
+    return expressions;
+  }
+
+  public void setExpressions(String expressions) {
+    this.expressions = expressions;
+  }
+
+  public boolean equals(Object o) {
+    if (o instanceof Specifier) {
+      Specifier other = (Specifier)o;
+      boolean eq = TUtil.checkEquals(this.name, other.name);
+      eq = eq && TUtil.checkEquals(this.expressions, other.expressions);
+      return  eq;
+    }
+    return false;
+  }
+
+  public int hashCode() {
+    return Objects.hashCode(this.name, this.expressions);
+
+  }
+
+  public Object clone() throws CloneNotSupportedException {
+    Specifier clone = (Specifier) super.clone();
+    clone.builder = CatalogProtos.SpecifierProto.newBuilder();
+    clone.name = this.name;
+    clone.expressions = this.expressions;
+    return clone;
+  }
+
+  public String toString() {
+    Gson gson = CatalogGsonHelper.getPrettyInstance();
+    return gson.toJson(this);
+  }
+
+  @Override
+  public CatalogProtos.SpecifierProto getProto() {
+    if(builder == null) {
+      builder = CatalogProtos.SpecifierProto.newBuilder();
+    }
+
+    if(this.name != null) {
+      builder.setName(this.name);
+    }
+
+    if(this.expressions != null) {
+      builder.setExpressions(this.expressions);
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public String toJson() {
+    return CatalogGsonHelper.toJson(this, Specifier.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index acfa4fb..e5af491 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -50,6 +50,13 @@ enum CompressType {
     LZ = 6;
 }
 
+enum PartitionsType {
+    RANGE = 0;
+    HASH = 1;
+    LIST = 2;
+    COLUMN = 3;
+}
+
 message ColumnMetaProto {
     required DataType dataType = 1;
     required bool compressed = 2;
@@ -106,6 +113,7 @@ message TableDescProto {
 	required TableProto meta = 3;
 	required SchemaProto schema = 4;
 	optional TableStatsProto stats = 5;
+	optional PartitionsProto partitions = 6;
 }
 
 enum FunctionType {
@@ -225,3 +233,15 @@ message SortSpecProto {
   optional bool ascending = 2 [default = true];
   optional bool nullFirst = 3 [default = false];
 }
+
+message PartitionsProto {
+  required PartitionsType partitionsType = 1;
+  repeated ColumnProto columns = 2;
+  optional int32 numPartitions = 3;
+  repeated SpecifierProto specifiers = 4;
+}
+
+message SpecifierProto {
+	optional string name = 1;
+	optional string expressions = 2;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/pom.xml b/tajo-catalog/tajo-catalog-server/pom.xml
index ca8a3ef..e1105eb 100644
--- a/tajo-catalog/tajo-catalog-server/pom.xml
+++ b/tajo-catalog/tajo-catalog-server/pom.xml
@@ -127,6 +127,10 @@
       <groupId>org.apache.tajo</groupId>
       <artifactId>tajo-rpc</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-algebra</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>com.google.protobuf</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index dc279bc..e6566af 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -253,9 +253,12 @@ public class CatalogServer extends AbstractService {
         descBuilder.setMeta(tableDesc.getMeta());
         descBuilder.setSchema(tableDesc.getSchema());
 
+        if( tableDesc.getPartitions() != null
+            && !tableDesc.getPartitions().toString().isEmpty()) {
+          descBuilder.setPartitions(tableDesc.getPartitions());
+        }
 
         store.addTable(new TableDesc(descBuilder.build()));
-
       } catch (IOException ioe) {
         LOG.error(ioe.getMessage(), ioe);
         return BOOL_FALSE;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 6a76794..3414e83 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -26,6 +26,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
@@ -37,6 +40,8 @@ import org.apache.tajo.exception.InternalException;
 import java.io.IOException;
 import java.sql.*;
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 
@@ -47,6 +52,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
   protected String connectionPassword;
   protected String catalogUri;
   private Connection conn;
+  protected Map<String, Boolean> baseTableMaps = new HashMap<String, Boolean>();
 
   protected static final int VERSION = 1;
 
@@ -202,6 +208,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
   @Override
   public void addTable(final TableDesc table) throws IOException {
     Statement stmt = null;
+    PreparedStatement pstmt = null;
+
     ResultSet res;
 
     String sql =
@@ -245,7 +253,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
 
 
       String optSql = String.format("INSERT INTO %s (%s, key_, value_) VALUES(?, ?, ?)", TB_OPTIONS, C_TABLE_ID);
-      PreparedStatement pstmt = getConnection().prepareStatement(optSql);
+      pstmt = getConnection().prepareStatement(optSql);
       try {
         for (Entry<String, String> entry : table.getMeta().toMap().entrySet()) {
           pstmt.setString(1, table.getName());
@@ -272,9 +280,94 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
         stmt.addBatch(sql);
         stmt.executeBatch();
       }
+
+      //Partition
+      if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
+        try {
+          Partitions partitions = table.getPartitions();
+          List<Column> columnList = partitions.getColumns();
+
+          // Find columns which used for a partitioned table.
+          StringBuffer columns = new StringBuffer();
+          for(Column eachColumn : columnList) {
+            sql = "SELECT column_id from " + TB_COLUMNS + " WHERE TID "
+                + " = " + tid + " AND column_name = '" + eachColumn.getColumnName() + "'";
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(sql);
+            }
+            res = stmt.executeQuery(sql);
+            if (!res.next()) {
+              throw new IOException("ERROR: there is no columnId matched to "
+                  + table.getName());
+            }
+            columnId = res.getInt("column_id");
+
+            if (columns.length() > 0) {
+              columns.append(",");
+            }
+            columns.append(columnId);
+          }
+
+          // Set default partition name. But if user named to subpartition, it would be updated.
+//          String partitionName = partitions.getPartitionsType().name() + "_" + table.getName();
+
+          sql = "INSERT INTO " + TB_PARTTIONS + " (name, TID, "
+              + " type, quantity, columns, expressions) VALUES (?, ?, ?, ?, ?, ?) ";
+          pstmt = getConnection().prepareStatement(sql);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(sql);
+          }
+
+          // Find information for subpartitions
+          if (partitions.getSpecifiers() != null) {
+            int count = 1;
+            if (partitions.getSpecifiers().size() == 0) {
+              pstmt.clearParameters();
+              pstmt.setString(1, null);
+              pstmt.setInt(2, tid);
+              pstmt.setString(3, partitions.getPartitionsType().name());
+              pstmt.setInt(4, partitions.getNumPartitions());
+              pstmt.setString(5, columns.toString());
+              pstmt.setString(6, null);
+              pstmt.addBatch();
+            } else {
+              for(Specifier specifier: partitions.getSpecifiers()) {
+                pstmt.clearParameters();
+                if (specifier.getName() != null && !specifier.getName().equals("")) {
+                  pstmt.setString(1, specifier.getName());
+                } else {
+                  pstmt.setString(1, null);
+                }
+                pstmt.setInt(2, tid);
+                pstmt.setString(3, partitions.getPartitionsType().name());
+                pstmt.setInt(4, partitions.getNumPartitions());
+                pstmt.setString(5, columns.toString());
+                pstmt.setString(6, specifier.getExpressions());
+                pstmt.addBatch();
+                count++;
+              }
+            }
+          } else {
+            pstmt.clearParameters();
+            pstmt.setString(1, null);
+            pstmt.setInt(2, tid);
+            pstmt.setString(3, partitions.getPartitionsType().name());
+            pstmt.setInt(4, partitions.getNumPartitions());
+            pstmt.setString(5, columns.toString());
+            pstmt.setString(6, null);
+            pstmt.addBatch();
+          }
+          pstmt.executeBatch();
+        } finally {
+          CatalogUtil.closeSQLWrapper(pstmt);
+        }
+      }
+
     } catch (SQLException se) {
       throw new IOException(se.getMessage(), se);
     } finally {
+      CatalogUtil.closeSQLWrapper(pstmt);
       CatalogUtil.closeSQLWrapper(stmt);
     }
   }
@@ -365,6 +458,20 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       CatalogUtil.closeSQLWrapper(stmt);
     }
 
+
+    try {
+      sql = "DELETE FROM " + TB_PARTTIONS + " WHERE TID IN ("
+        + " SELECT TID FROM " + TB_TABLES
+        + " WHERE " + C_TABLE_ID + " = '" + name + "' )";
+      LOG.info(sql);
+      stmt = getConnection().createStatement();
+      stmt.execute(sql);
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+
     try {
       sql = "DELETE FROM " + TB_TABLES +
           " WHERE " + C_TABLE_ID + " = '" + name + "'";
@@ -376,6 +483,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     } finally {
       CatalogUtil.closeSQLWrapper(stmt);
     }
+
   }
 
   @Override
@@ -388,10 +496,12 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     StoreType storeType = null;
     Options options;
     TableStats stat = null;
+    Partitions partitions = null;
+    int tid = 0;
 
     try {
       String sql =
-          "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+          "SELECT " + C_TABLE_ID + ", path, store_type, TID from " + TB_TABLES
               + " WHERE " + C_TABLE_ID + "='" + name + "'";
       if (LOG.isDebugEnabled()) {
         LOG.debug(sql);
@@ -404,6 +514,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       tableName = res.getString(C_TABLE_ID).trim();
       path = new Path(res.getString("path").trim());
       storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+      tid = res.getInt("TID");
     } catch (SQLException se) {
       throw new IOException(se);
     } finally {
@@ -481,15 +592,84 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       CatalogUtil.closeSQLWrapper(res, stmt);
     }
 
+    try {
+      String sql = "SELECT name, type, quantity, columns, expressions from " + TB_PARTTIONS
+          + " WHERE TID =" + tid + "";
+      stmt = getConnection().createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+
+      while (res.next()) {
+        if (partitions == null) {
+          partitions = new Partitions();
+          String[] columns = res.getString("columns").split(",");
+          for(String eachColumn: columns) {
+            partitions.addColumn(getColumn(tableName, tid, eachColumn));
+          }
+          partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString("type")));
+          partitions.setNumPartitions(res.getInt("quantity"));
+        }
+
+        Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
+        partitions.addSpecifier(specifier);
+      }
+
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
     TableMeta meta = new TableMeta(storeType, options);
     TableDesc table = new TableDesc(tableName, schema, meta, path);
     if (stat != null) {
       table.setStats(stat);
     }
 
+    if (partitions != null) {
+      table.setPartitions(partitions);
+    }
+
     return table;
   }
 
+  private Column getColumn(String tableName, int tid, String columnId) throws IOException {
+    ResultSet res = null;
+    Column column = null;
+    Statement stmt = null;
+
+    try {
+      String sql = "SELECT column_name, data_type, type_length from "
+          + TB_COLUMNS + " WHERE TID = " + tid + " AND column_id = " + columnId;
+
+      stmt = getConnection().createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+
+      if (res.next()) {
+        String columnName = tableName + "."
+            + res.getString("column_name").trim();
+        Type dataType = getDataType(res.getString("data_type")
+            .trim());
+        int typeLength = res.getInt("type_length");
+        if (typeLength > 0) {
+          column = new Column(columnName, dataType, typeLength);
+        } else {
+          column = new Column(columnName, dataType);
+        }
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    return column;
+  }
+
   private Type getDataType(final String typeStr) {
     try {
       return Enum.valueOf(Type.class, typeStr);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
index 6fee78b..06a701b 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -24,6 +24,9 @@ package org.apache.tajo.catalog.store;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
@@ -38,6 +41,7 @@ import java.sql.*;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -68,143 +72,177 @@ public class DerbyStore extends AbstractDBStore {
     try {
       // META
       stmt = getConnection().createStatement();
-      String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(meta_ddl);
+
+      if (!baseTableMaps.get(TB_META)) {
+        String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(meta_ddl);
+        }
+        stmt.executeUpdate(meta_ddl);
+        LOG.info("Table '" + TB_META + " is created.");
       }
-      stmt.executeUpdate(meta_ddl);
-      LOG.info("Table '" + TB_META + " is created.");
 
       // TABLES
-      String tables_ddl = "CREATE TABLE "
-          + TB_TABLES + " ("
-          + "TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, "
-          + "path VARCHAR(1024), "
-          + "store_type CHAR(16), "
-          + "CONSTRAINT TABLES_PK PRIMARY KEY (TID)" +
-          ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(tables_ddl);
-      }
-      stmt.addBatch(tables_ddl);
-      String idx_tables_tid = 
-          "CREATE UNIQUE INDEX idx_tables_tid on " + TB_TABLES + " (TID)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_tables_tid);
-      }
-      stmt.addBatch(idx_tables_tid);
-      
-      String idx_tables_name = "CREATE UNIQUE INDEX idx_tables_name on " 
-          + TB_TABLES + "(" + C_TABLE_ID + ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_tables_name);
+      if (!baseTableMaps.get(TB_TABLES)) {
+        String tables_ddl = "CREATE TABLE "
+            + TB_TABLES + " ("
+            + "TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, "
+            + "path VARCHAR(1024), "
+            + "store_type CHAR(16), "
+            + "CONSTRAINT TABLES_PK PRIMARY KEY (TID)" +
+            ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(tables_ddl);
+        }
+        stmt.addBatch(tables_ddl);
+
+        String idx_tables_tid =
+            "CREATE UNIQUE INDEX idx_tables_tid on " + TB_TABLES + " (TID)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_tables_tid);
+        }
+        stmt.addBatch(idx_tables_tid);
+
+        String idx_tables_name = "CREATE UNIQUE INDEX idx_tables_name on "
+            + TB_TABLES + "(" + C_TABLE_ID + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_tables_name);
+        }
+        stmt.addBatch(idx_tables_name);
+        stmt.executeBatch();
+        LOG.info("Table '" + TB_TABLES + "' is created.");
       }
-      stmt.addBatch(idx_tables_name);
-      stmt.executeBatch();
-      LOG.info("Table '" + TB_TABLES + "' is created.");
 
       // COLUMNS
-      String columns_ddl = 
-          "CREATE TABLE " + TB_COLUMNS + " ("
-          + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES " + TB_TABLES + "("
-          + C_TABLE_ID + ") ON DELETE CASCADE, "
-          + "column_id INT NOT NULL,"
-          + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
-          + "CONSTRAINT C_COLUMN_ID UNIQUE (" + C_TABLE_ID + ", column_name))";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(columns_ddl);
-      }
-      stmt.addBatch(columns_ddl);
+      if (!baseTableMaps.get(TB_COLUMNS)) {
+        String columns_ddl =
+            "CREATE TABLE " + TB_COLUMNS + " ("
+                + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
+                + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES " + TB_TABLES + "("
+                + C_TABLE_ID + ") ON DELETE CASCADE, "
+                + "column_id INT NOT NULL,"
+                + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+                + "CONSTRAINT C_COLUMN_ID UNIQUE (" + C_TABLE_ID + ", column_name))";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(columns_ddl);
+        }
+        stmt.addBatch(columns_ddl);
 
-      String idx_fk_columns_table_name = 
-          "CREATE UNIQUE INDEX idx_fk_columns_table_name on "
-          + TB_COLUMNS + "(" + C_TABLE_ID + ", column_name)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_fk_columns_table_name);
+        String idx_fk_columns_table_name =
+            "CREATE UNIQUE INDEX idx_fk_columns_table_name on "
+                + TB_COLUMNS + "(" + C_TABLE_ID + ", column_name)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_fk_columns_table_name);
+        }
+        stmt.addBatch(idx_fk_columns_table_name);
+        stmt.executeBatch();
+        LOG.info("Table '" + TB_COLUMNS + " is created.");
       }
-      stmt.addBatch(idx_fk_columns_table_name);
-      stmt.executeBatch();
-      LOG.info("Table '" + TB_COLUMNS + " is created.");
 
       // OPTIONS
-      String options_ddl = 
-          "CREATE TABLE " + TB_OPTIONS +" ("
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID +") "
-          + "ON DELETE CASCADE, "
-          + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(options_ddl);
-      }
-      stmt.addBatch(options_ddl);
-      
-      String idx_options_key = 
-          "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_options_key);
-      }
-      stmt.addBatch(idx_options_key);
-      String idx_options_table_name = 
-          "CREATE INDEX idx_options_table_name on " + TB_OPTIONS 
-          + "(" + C_TABLE_ID + ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_options_table_name);
+      if (!baseTableMaps.get(TB_OPTIONS)) {
+        String options_ddl =
+            "CREATE TABLE " + TB_OPTIONS +" ("
+                + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID +") "
+                + "ON DELETE CASCADE, "
+                + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(options_ddl);
+        }
+        stmt.addBatch(options_ddl);
+
+        String idx_options_key =
+            "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_options_key);
+        }
+        stmt.addBatch(idx_options_key);
+        String idx_options_table_name =
+            "CREATE INDEX idx_options_table_name on " + TB_OPTIONS
+                + "(" + C_TABLE_ID + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_options_table_name);
+        }
+        stmt.addBatch(idx_options_table_name);
+        stmt.executeBatch();
+        LOG.info("Table '" + TB_OPTIONS + " is created.");
       }
-      stmt.addBatch(idx_options_table_name);
-      stmt.executeBatch();
-      LOG.info("Table '" + TB_OPTIONS + " is created.");
-      
+
       // INDEXES
-      String indexes_ddl = "CREATE TABLE " + TB_INDEXES +"("
-          + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
-          + "ON DELETE CASCADE, "
-          + "column_name VARCHAR(255) NOT NULL, "
-          + "data_type VARCHAR(255) NOT NULL, "
-          + "index_type CHAR(32) NOT NULL, "
-          + "is_unique BOOLEAN NOT NULL, "
-          + "is_clustered BOOLEAN NOT NULL, "
-          + "is_ascending BOOLEAN NOT NULL)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(indexes_ddl);
+      if (!baseTableMaps.get(TB_INDEXES)) {
+        String indexes_ddl = "CREATE TABLE " + TB_INDEXES +"("
+            + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+            + "ON DELETE CASCADE, "
+            + "column_name VARCHAR(255) NOT NULL, "
+            + "data_type VARCHAR(255) NOT NULL, "
+            + "index_type CHAR(32) NOT NULL, "
+            + "is_unique BOOLEAN NOT NULL, "
+            + "is_clustered BOOLEAN NOT NULL, "
+            + "is_ascending BOOLEAN NOT NULL)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(indexes_ddl);
+        }
+        stmt.addBatch(indexes_ddl);
+
+        String idx_indexes_key = "CREATE UNIQUE INDEX idx_indexes_key ON "
+            + TB_INDEXES + " (index_name)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_indexes_key);
+        }
+        stmt.addBatch(idx_indexes_key);
+
+        String idx_indexes_columns = "CREATE INDEX idx_indexes_columns ON "
+            + TB_INDEXES + " (" + C_TABLE_ID + ", column_name)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_indexes_columns);
+        }
+        stmt.addBatch(idx_indexes_columns);
+        stmt.executeBatch();
+        LOG.info("Table '" + TB_INDEXES + "' is created.");
       }
-      stmt.addBatch(indexes_ddl);
-      
-      String idx_indexes_key = "CREATE UNIQUE INDEX idx_indexes_key ON " 
-          + TB_INDEXES + " (index_name)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_indexes_key);
-      }      
-      stmt.addBatch(idx_indexes_key);
-      
-      String idx_indexes_columns = "CREATE INDEX idx_indexes_columns ON " 
-          + TB_INDEXES + " (" + C_TABLE_ID + ", column_name)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_indexes_columns);
-      } 
-      stmt.addBatch(idx_indexes_columns);
-      stmt.executeBatch();
-      LOG.info("Table '" + TB_INDEXES + "' is created.");
 
-      String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
-          + "ON DELETE CASCADE, "
-          + "num_rows BIGINT, "
-          + "num_bytes BIGINT)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(stats_ddl);
+      if (!baseTableMaps.get(TB_STATISTICS)) {
+        String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+            + "ON DELETE CASCADE, "
+            + "num_rows BIGINT, "
+            + "num_bytes BIGINT)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(stats_ddl);
+        }
+        stmt.addBatch(stats_ddl);
+
+        String idx_stats_fk_table_name = "CREATE INDEX idx_stats_table_name ON "
+            + TB_STATISTICS + " (" + C_TABLE_ID + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_stats_fk_table_name);
+        }
+        stmt.addBatch(idx_stats_fk_table_name);
+        LOG.info("Table '" + TB_STATISTICS + "' is created.");
       }
-      stmt.addBatch(stats_ddl);
 
-      String idx_stats_fk_table_name = "CREATE INDEX idx_stats_table_name ON "
-          + TB_STATISTICS + " (" + C_TABLE_ID + ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_stats_fk_table_name);
+      // PARTITION
+      if (!baseTableMaps.get(TB_PARTTIONS)) {
+        String partition_ddl = "CREATE TABLE " + TB_PARTTIONS + " ("
+            + "PID INT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+            + "name VARCHAR(255), "
+            + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
+            + "type VARCHAR(10) NOT NULL,"
+            + "quantity INT ,"
+            + "columns VARCHAR(255),"
+            + "expressions VARCHAR(1024)"
+            + ", CONSTRAINT PARTITION_PK PRIMARY KEY (PID)"
+            + "   )";
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(partition_ddl);
+        }
+        stmt.addBatch(partition_ddl);
+        LOG.info("Table '" + TB_PARTTIONS + "' is created.");
+        stmt.executeBatch();
       }
-      stmt.addBatch(idx_stats_fk_table_name);
-      stmt.executeBatch();
-      LOG.info("Table '" + TB_STATISTICS + "' is created.");
 
     } finally {
       wlock.unlock();
@@ -220,26 +258,34 @@ public class DerbyStore extends AbstractDBStore {
 
     wlock.lock();
     ResultSet res = null;
+    int foundCount = 0;
     try {
-      boolean found = false;
       res = getConnection().getMetaData().getTables(null, null, null,
           new String [] {"TABLE"});
-      
-      String resName;
-      while (res.next() && !found) {
-        resName = res.getString("TABLE_NAME");
-        if (TB_META.equals(resName)
-            || TB_TABLES.equals(resName)
-            || TB_COLUMNS.equals(resName)
-            || TB_OPTIONS.equals(resName)) {
-            return true;
-        }
+
+      baseTableMaps.put(TB_META, false);
+      baseTableMaps.put(TB_TABLES, false);
+      baseTableMaps.put(TB_COLUMNS, false);
+      baseTableMaps.put(TB_OPTIONS, false);
+      baseTableMaps.put(TB_STATISTICS, false);
+      baseTableMaps.put(TB_INDEXES, false);
+      baseTableMaps.put(TB_PARTTIONS, false);
+
+      while (res.next()) {
+        baseTableMaps.put(res.getString("TABLE_NAME"), true);
       }
     } finally {
       wlock.unlock();
       CatalogUtil.closeSQLWrapper(res);
-    }    
-    return false;
+    }
+
+    for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
+      if (!entry.getValue()) {
+        return false;
+      }
+    }
+
+    return true;
   }
   
   final boolean checkInternalTable(final String tableName) throws SQLException {
@@ -263,6 +309,7 @@ public class DerbyStore extends AbstractDBStore {
   
   @Override
   public final void addTable(final TableDesc table) throws IOException {
+    PreparedStatement pstmt = null;
     Statement stmt = null;
     ResultSet res = null;
 
@@ -324,10 +371,95 @@ public class DerbyStore extends AbstractDBStore {
         }
         stmt.executeUpdate(sql);
       }
+
+      //Partition
+      if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
+        try {
+          Partitions partitions = table.getPartitions();
+          List<Column> columnList = partitions.getColumns();
+
+          // Find columns which used for a partitioned table.
+          StringBuffer columns = new StringBuffer();
+          for(Column eachColumn : columnList) {
+            sql = "SELECT column_id from " + TB_COLUMNS + " WHERE TID "
+                + " = " + tid + " AND column_name = '" + eachColumn.getColumnName() + "'";
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(sql);
+            }
+            res = stmt.executeQuery(sql);
+            if (!res.next()) {
+              throw new IOException("ERROR: there is no columnId matched to "
+                  + table.getName());
+            }
+            columnId = res.getInt("column_id");
+
+            if(columns.length() > 0) {
+              columns.append(",");
+            }
+            columns.append(columnId);
+          }
+
+          // Set default partition name. But if user named to subpartition, it would be updated.
+//          String partitionName = partitions.getPartitionsType().name() + "_" + table.getName();
+
+          sql = "INSERT INTO " + TB_PARTTIONS + " (name, TID, "
+              + " type, quantity, columns, expressions) VALUES (?, ?, ?, ?, ?, ?) ";
+          pstmt = getConnection().prepareStatement(sql);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(sql);
+          }
+
+          // Find information for subpartitions
+          if (partitions.getSpecifiers() != null) {
+            int count = 1;
+            if (partitions.getSpecifiers().size() == 0) {
+              pstmt.clearParameters();
+              pstmt.setString(1, null);
+              pstmt.setInt(2, tid);
+              pstmt.setString(3, partitions.getPartitionsType().name());
+              pstmt.setInt(4, partitions.getNumPartitions());
+              pstmt.setString(5, columns.toString());
+              pstmt.setString(6, null);
+              pstmt.addBatch();
+            } else {
+              for(Specifier eachValue: partitions.getSpecifiers()) {
+                pstmt.clearParameters();
+                if (eachValue.getName() != null && !eachValue.getName().equals("")) {
+                  pstmt.setString(1, eachValue.getName());
+                } else {
+                  pstmt.setString(1, null);
+                }
+                pstmt.setInt(2, tid);
+                pstmt.setString(3, partitions.getPartitionsType().name());
+                pstmt.setInt(4, partitions.getNumPartitions());
+                pstmt.setString(5, columns.toString());
+                pstmt.setString(6, eachValue.getExpressions());
+                pstmt.addBatch();
+                count++;
+              }
+            }
+          } else {
+            pstmt.clearParameters();
+            pstmt.setString(1, null);
+            pstmt.setInt(2, tid);
+            pstmt.setString(3, partitions.getPartitionsType().name());
+            pstmt.setInt(4, partitions.getNumPartitions());
+            pstmt.setString(5, columns.toString());
+            pstmt.setString(6, null);
+            pstmt.addBatch();
+          }
+          pstmt.executeBatch();
+        } finally {
+          CatalogUtil.closeSQLWrapper(pstmt);
+        }
+      }
+
     } catch (SQLException se) {
       throw new IOException(se.getMessage(), se);
     } finally {
       wlock.unlock();
+      CatalogUtil.closeSQLWrapper(res, pstmt);
       CatalogUtil.closeSQLWrapper(res, stmt);
     }
   }
@@ -428,6 +560,17 @@ public class DerbyStore extends AbstractDBStore {
       }
 
       try {
+        sql = "DELETE FROM " + TB_PARTTIONS + " WHERE TID IN ("
+            + " SELECT TID FROM " + TB_TABLES
+            + " WHERE " + C_TABLE_ID + " = '" + name + "' )";
+        LOG.info(sql);
+        stmt = getConnection().createStatement();
+        stmt.execute(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      }
+
+      try {
         sql = "DELETE FROM " + TB_TABLES +
             " WHERE " + C_TABLE_ID +" = '" + name + "'";
         LOG.info(sql);
@@ -435,7 +578,6 @@ public class DerbyStore extends AbstractDBStore {
       } catch (SQLException se) {
         throw new IOException(se);
       }
-
     } catch (SQLException se) {
       throw new IOException(se);
     } finally {
@@ -454,6 +596,8 @@ public class DerbyStore extends AbstractDBStore {
     StoreType storeType = null;
     Options options;
     TableStats stat = null;
+    Partitions partitions = null;
+    int tid = 0;
 
     try {
       rlock.lock();
@@ -461,7 +605,7 @@ public class DerbyStore extends AbstractDBStore {
 
       try {
         String sql = 
-            "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+            "SELECT " + C_TABLE_ID + ", path, store_type, TID from " + TB_TABLES
             + " WHERE " + C_TABLE_ID + "='" + name + "'";
         if (LOG.isDebugEnabled()) {
           LOG.debug(sql);
@@ -474,6 +618,7 @@ public class DerbyStore extends AbstractDBStore {
         tableName = res.getString(C_TABLE_ID).trim();
         path = new Path(res.getString("path").trim());
         storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+        tid = res.getInt("TID");
       } catch (SQLException se) { 
         throw new IOException(se);
       } finally {
@@ -550,12 +695,48 @@ public class DerbyStore extends AbstractDBStore {
         CatalogUtil.closeSQLWrapper(res);
       }
 
+
+      try {
+        String sql = "SELECT name, type, quantity, columns, expressions from " + TB_PARTTIONS
+            + " WHERE TID =" + tid + "";
+        stmt = getConnection().createStatement();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        res = stmt.executeQuery(sql);
+
+        while (res.next()) {
+          if (partitions == null) {
+            partitions = new Partitions();
+            String[] columns = res.getString("columns").split(",");
+            for(String eachColumn: columns) {
+              partitions.addColumn(getColumn(tableName, tid, eachColumn));
+            }
+            partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString
+                ("type")));
+            partitions.setNumPartitions(res.getInt("quantity"));
+          }
+
+          Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
+          partitions.addSpecifier(specifier);
+        }
+
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        CatalogUtil.closeSQLWrapper(res, stmt);
+      }
+
       TableMeta meta = new TableMeta(storeType, options);
       TableDesc table = new TableDesc(tableName, schema, meta, path);
       if (stat != null) {
         table.setStats(stat);
       }
 
+      if (partitions != null) {
+        table.setPartitions(partitions);
+      }
+
       return table;
     } catch (SQLException se) {
       throw new IOException(se);
@@ -564,7 +745,42 @@ public class DerbyStore extends AbstractDBStore {
       CatalogUtil.closeSQLWrapper(stmt);
     }
   }
-  
+
+  private Column getColumn(String tableName, int tid, String columnId) throws IOException {
+    ResultSet res = null;
+    Column column = null;
+    Statement stmt = null;
+
+    try {
+      String sql = "SELECT column_name, data_type, type_length from "
+          + TB_COLUMNS + " WHERE TID = " + tid + " AND column_id = " + columnId;
+
+      stmt = getConnection().createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+
+      if (res.next()) {
+        String columnName = tableName + "."
+            + res.getString("column_name").trim();
+        Type dataType = getDataType(res.getString("data_type")
+            .trim());
+        int typeLength = res.getInt("type_length");
+        if (typeLength > 0) {
+          column = new Column(columnName, dataType, typeLength);
+        } else {
+          column = new Column(columnName, dataType);
+        }
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    return column;
+  }
+
   private Type getDataType(final String typeStr) {
     try {
     return Enum.valueOf(Type.class, typeStr);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
index 259d9d6..c368969 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
@@ -29,6 +29,7 @@ import org.apache.tajo.exception.InternalException;
 import java.io.IOException;
 import java.sql.*;
 import java.util.List;
+import java.util.Map;
 
 public class MySQLStore extends AbstractDBStore  {
 
@@ -51,115 +52,158 @@ public class MySQLStore extends AbstractDBStore  {
   // TODO - DDL and index statements should be renamed
   protected void createBaseTable() throws SQLException {
 
-    // META
-    Statement stmt = getConnection().createStatement();
-    String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(meta_ddl);
-    }
+    int result;
+    Statement stmt = null;
     try {
-      int result = stmt.executeUpdate(meta_ddl);
-      LOG.info("Table '" + TB_META + " is created.");
+      stmt = getConnection().createStatement();
+
+      // META
+      if (!baseTableMaps.get(TB_META)) {
+        String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(meta_ddl);
+        }
+        result = stmt.executeUpdate(meta_ddl);
+        LOG.info("Table '" + TB_META + " is created.");
+      }
 
       // TABLES
-      String tables_ddl = "CREATE TABLE "
-          + TB_TABLES + " ("
-          + "TID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL UNIQUE, "
-          + "path TEXT, "
-          + "store_type CHAR(16)"
-          + ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(tables_ddl);
+      if (!baseTableMaps.get(TB_TABLES)) {
+        String tables_ddl = "CREATE TABLE "
+            + TB_TABLES + " ("
+            + "TID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL UNIQUE, "
+            + "path TEXT, "
+            + "store_type CHAR(16)"
+            + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(tables_ddl);
+        }
+
+        LOG.info("Table '" + TB_TABLES + "' is created.");
+        result = stmt.executeUpdate(tables_ddl);
       }
 
-      LOG.info("Table '" + TB_TABLES + "' is created.");
-      result = stmt.executeUpdate(tables_ddl);
       // COLUMNS
+      if (!baseTableMaps.get(TB_COLUMNS)) {
+        String columns_ddl =
+            "CREATE TABLE " + TB_COLUMNS + " ("
+                + "TID INT NOT NULL,"
+                + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+                + "column_id INT NOT NULL,"
+                + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+                + "UNIQUE KEY(" + C_TABLE_ID + ", column_name),"
+                + "FOREIGN KEY(TID) REFERENCES "+TB_TABLES+"(TID) ON DELETE CASCADE,"
+                + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(columns_ddl);
+        }
 
-      String columns_ddl =
-          "CREATE TABLE " + TB_COLUMNS + " ("
-              + "TID INT NOT NULL,"
-              + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
-              + "column_id INT NOT NULL,"
-              + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
-              + "UNIQUE KEY(" + C_TABLE_ID + ", column_name),"
-              + "FOREIGN KEY(TID) REFERENCES "+TB_TABLES+"(TID) ON DELETE CASCADE,"
-              + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(columns_ddl);
+        LOG.info("Table '" + TB_COLUMNS + " is created.");
+        result = stmt.executeUpdate(columns_ddl);
       }
 
-      LOG.info("Table '" + TB_COLUMNS + " is created.");
-      result = stmt.executeUpdate(columns_ddl);
       // OPTIONS
-
-      String options_ddl =
-          "CREATE TABLE " + TB_OPTIONS + " ("
-              + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
-              + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL,"
-              + "INDEX("+C_TABLE_ID+", key_),"
-              + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(options_ddl);
+      if (!baseTableMaps.get(TB_OPTIONS)) {
+        String options_ddl =
+            "CREATE TABLE " + TB_OPTIONS + " ("
+                + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+                + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL,"
+                + "INDEX("+C_TABLE_ID+", key_),"
+                + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(options_ddl);
+        }
+        LOG.info("Table '" + TB_OPTIONS + " is created.");
+        result = stmt.executeUpdate(options_ddl);
       }
-      LOG.info("Table '" + TB_OPTIONS + " is created.");
-      result = stmt.executeUpdate(options_ddl);
+
       // INDEXES
+      if (!baseTableMaps.get(TB_INDEXES)) {
+        String indexes_ddl = "CREATE TABLE " + TB_INDEXES + "("
+            + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+            + "column_name VARCHAR(255) NOT NULL, "
+            + "data_type VARCHAR(255) NOT NULL, "
+            + "index_type CHAR(32) NOT NULL, "
+            + "is_unique BOOLEAN NOT NULL, "
+            + "is_clustered BOOLEAN NOT NULL, "
+            + "is_ascending BOOLEAN NOT NULL,"
+            + "INDEX(" + C_TABLE_ID + ", column_name),"
+            + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(indexes_ddl);
+        }
+        LOG.info("Table '" + TB_INDEXES + "' is created.");
+        result = stmt.executeUpdate(indexes_ddl);
+      }
 
-      String indexes_ddl = "CREATE TABLE " + TB_INDEXES + "("
-          + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
-          + "column_name VARCHAR(255) NOT NULL, "
-          + "data_type VARCHAR(255) NOT NULL, "
-          + "index_type CHAR(32) NOT NULL, "
-          + "is_unique BOOLEAN NOT NULL, "
-          + "is_clustered BOOLEAN NOT NULL, "
-          + "is_ascending BOOLEAN NOT NULL,"
-          + "INDEX(" + C_TABLE_ID + ", column_name),"
-          + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(indexes_ddl);
+      if (!baseTableMaps.get(TB_STATISTICS)) {
+        String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+            + "num_rows BIGINT, "
+            + "num_bytes BIGINT,"
+            + "INDEX("+C_TABLE_ID+"),"
+            + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(stats_ddl);
+        }
+        LOG.info("Table '" + TB_STATISTICS + "' is created.");
+        result = stmt.executeUpdate(stats_ddl);
       }
-      LOG.info("Table '" + TB_INDEXES + "' is created.");
-      result = stmt.executeUpdate(indexes_ddl);
-
-      String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
-          + "num_rows BIGINT, "
-          + "num_bytes BIGINT,"
-          + "INDEX("+C_TABLE_ID+"),"
-          + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(stats_ddl);
+
+      // PARTITION
+      if (!baseTableMaps.get(TB_PARTTIONS)) {
+        String partition_ddl = "CREATE TABLE " + TB_PARTTIONS + " ("
+            + "PID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+            + "name VARCHAR(255), "
+            + "TID INT NOT NULL,"
+            + "type VARCHAR(10) NOT NULL,"
+            + "quantity INT ,"
+            + "columns VARCHAR(255),"
+            + "expressions TEXT )";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(partition_ddl);
+        }
+        LOG.info("Table '" + TB_PARTTIONS + "' is created.");
+        result = stmt.executeUpdate(partition_ddl);
       }
-      LOG.info("Table '" + TB_STATISTICS + "' is created.");
-      result = stmt.executeUpdate(stats_ddl);
     } finally {
       CatalogUtil.closeSQLWrapper(stmt);
     }
   }
 
   protected boolean isInitialized() throws SQLException {
-    boolean found = false;
     ResultSet res = getConnection().getMetaData().getTables(null, null, null,
         new String[]{"TABLE"});
 
-    String resName;
     try {
-      while (res.next() && !found) {
-        resName = res.getString("TABLE_NAME");
-        if (TB_META.equals(resName)
-            || TB_TABLES.equals(resName)
-            || TB_COLUMNS.equals(resName)
-            || TB_OPTIONS.equals(resName)) {
-          return true;
-        }
+      baseTableMaps.put(TB_META, false);
+      baseTableMaps.put(TB_TABLES, false);
+      baseTableMaps.put(TB_COLUMNS, false);
+      baseTableMaps.put(TB_OPTIONS, false);
+      baseTableMaps.put(TB_STATISTICS, false);
+      baseTableMaps.put(TB_INDEXES, false);
+      baseTableMaps.put(TB_PARTTIONS, false);
+
+      if (res.wasNull())
+        return false;
+
+      while (res.next()) {
+        baseTableMaps.put(res.getString("TABLE_NAME"), true);
       }
     } finally {
       CatalogUtil.closeSQLWrapper(res);
     }
-    return false;
+
+    for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
+      if (!entry.getValue()) {
+        return false;
+      }
+    }
+
+    return  true;
+//    return false;
   }
 
   @Override