You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2013/12/03 10:29:41 UTC

[1/2] TAJO-284: Add table partitioning entry to Catalog. (jaehwa)

Updated Branches:
  refs/heads/master 29a0aa01c -> 0b0de13b2


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index b824756..d174e72 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -20,6 +20,9 @@ package org.apache.tajo.catalog;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.function.Function;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -70,19 +73,19 @@ public class TestCatalog {
 		schema1.addColumn(FieldName2, Type.INT4);
 		schema1.addColumn(FieldName3, Type.INT8);
     Path path = new Path(CommonTestingUtil.getTestDir(), "table1");
-		TableDesc meta = CatalogUtil.newTableDesc(
+    TableDesc meta = CatalogUtil.newTableDesc(
         "getTable",
         schema1,
         StoreType.CSV,
         new Options(),
         path);
-		
-		assertFalse(catalog.existsTable("getTable"));
-		catalog.addTable(meta);
-		assertTrue(catalog.existsTable("getTable"));
-		
-		catalog.deleteTable("getTable");
+
 		assertFalse(catalog.existsTable("getTable"));
+    catalog.addTable(meta);
+    assertTrue(catalog.existsTable("getTable"));
+
+    catalog.deleteTable("getTable");
+    assertFalse(catalog.existsTable("getTable"));
 	}
 	
 	@Test(expected = Throwable.class)
@@ -197,4 +200,212 @@ public class TestCatalog {
     catalog.createFunction(overload);
     assertTrue(catalog.containFunction("test3", CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.BLOB)));
   }
+
+  @Test
+  public final void testAddAndDeleteTablePartitionByHash1() throws Exception {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4)
+        .addColumn("name", Type.TEXT)
+        .addColumn("age", Type.INT4)
+        .addColumn("score", Type.FLOAT8);
+
+    String tableName = "addedtable";
+    Options opts = new Options();
+    opts.put("file.delimiter", ",");
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+    Partitions partitions = new Partitions();
+    partitions.addColumn(new Column("id", Type.INT4));
+    partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+    partitions.setNumPartitions(2);
+
+    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    desc.setPartitions(partitions);
+
+    assertFalse(catalog.existsTable(tableName));
+    catalog.addTable(desc);
+    assertTrue(catalog.existsTable(tableName));
+    TableDesc retrieved = catalog.getTableDesc(tableName);
+
+    assertEquals(retrieved.getName(), tableName);
+    assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
+    assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+    assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
+
+    catalog.deleteTable(tableName);
+    assertFalse(catalog.existsTable(tableName));
+  }
+
+
+  @Test
+  public final void testAddAndDeleteTablePartitionByHash2() throws Exception {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4)
+        .addColumn("name", Type.TEXT)
+        .addColumn("age", Type.INT4)
+        .addColumn("score", Type.FLOAT8);
+
+    String tableName = "addedtable";
+    Options opts = new Options();
+    opts.put("file.delimiter", ",");
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+    Partitions partitions = new Partitions();
+    partitions.addColumn(new Column("id", Type.INT4));
+    partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+    partitions.setNumPartitions(2);
+
+    partitions.addSpecifier(new Specifier("sub_part1"));
+    partitions.addSpecifier(new Specifier("sub_part2"));
+    partitions.addSpecifier(new Specifier("sub_part3"));
+
+    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    desc.setPartitions(partitions);
+    assertFalse(catalog.existsTable(tableName));
+    catalog.addTable(desc);
+    assertTrue(catalog.existsTable(tableName));
+
+    TableDesc retrieved = catalog.getTableDesc(tableName);
+
+    assertEquals(retrieved.getName(), tableName);
+    assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
+    assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+    assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
+        "sub_part1");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
+        "sub_part2");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getName(),
+        "sub_part3");
+
+    catalog.deleteTable(tableName);
+    assertFalse(catalog.existsTable(tableName));
+  }
+
+  @Test
+  public final void testAddAndDeleteTablePartitionByList() throws Exception {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4)
+        .addColumn("name", Type.TEXT)
+        .addColumn("age", Type.INT4)
+        .addColumn("score", Type.FLOAT8);
+
+    String tableName = "addedtable";
+    Options opts = new Options();
+    opts.put("file.delimiter", ",");
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+    Partitions partitions = new Partitions();
+    partitions.addColumn(new Column("id", Type.INT4));
+    partitions.setPartitionsType(CatalogProtos.PartitionsType.LIST);
+
+    partitions.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
+    partitions.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+
+    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    desc.setPartitions(partitions);
+    assertFalse(catalog.existsTable(tableName));
+    catalog.addTable(desc);
+    assertTrue(catalog.existsTable(tableName));
+
+    TableDesc retrieved = catalog.getTableDesc(tableName);
+
+    assertEquals(retrieved.getName(), tableName);
+    assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.LIST);
+    assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
+        "sub_part1");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
+        "Seoul,서울");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
+        "sub_part2");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getExpressions(),
+        "Busan,부산");
+
+    catalog.deleteTable(tableName);
+    assertFalse(catalog.existsTable(tableName));
+  }
+
+  @Test
+  public final void testAddAndDeleteTablePartitionByRange() throws Exception {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4)
+        .addColumn("name", Type.TEXT)
+        .addColumn("age", Type.INT4)
+        .addColumn("score", Type.FLOAT8);
+
+    String tableName = "addedtable";
+    Options opts = new Options();
+    opts.put("file.delimiter", ",");
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+    Partitions partitions = new Partitions();
+    partitions.addColumn(new Column("id", Type.INT4));
+    partitions.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
+
+    partitions.addSpecifier(new Specifier("sub_part1", "2"));
+    partitions.addSpecifier(new Specifier("sub_part2", "5"));
+    partitions.addSpecifier(new Specifier("sub_part3"));
+
+    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    desc.setPartitions(partitions);
+    assertFalse(catalog.existsTable(tableName));
+    catalog.addTable(desc);
+    assertTrue(catalog.existsTable(tableName));
+
+    TableDesc retrieved = catalog.getTableDesc(tableName);
+
+    assertEquals(retrieved.getName(), tableName);
+    assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.RANGE);
+    assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
+        "sub_part1");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
+        "2");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
+        "sub_part2");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getExpressions(),
+        "5");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getName(),
+        "sub_part3");
+    assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getExpressions(),
+        "");
+
+    catalog.deleteTable(tableName);
+    assertFalse(catalog.existsTable(tableName));
+  }
+
+  @Test
+  public final void testAddAndDeleteTablePartitionByColumn() throws Exception {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4)
+        .addColumn("name", Type.TEXT)
+        .addColumn("age", Type.INT4)
+        .addColumn("score", Type.FLOAT8);
+
+    String tableName = "addedtable";
+    Options opts = new Options();
+    opts.put("file.delimiter", ",");
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+    Partitions partitions = new Partitions();
+    partitions.addColumn(new Column("id", Type.INT4));
+    partitions.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+
+    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    desc.setPartitions(partitions);
+    assertFalse(catalog.existsTable(tableName));
+    catalog.addTable(desc);
+    assertTrue(catalog.existsTable(tableName));
+
+    TableDesc retrieved = catalog.getTableDesc(tableName);
+
+    assertEquals(retrieved.getName(), tableName);
+    assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.COLUMN);
+    assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+
+    catalog.deleteTable(tableName);
+    assertFalse(catalog.existsTable(tableName));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
index 260b4c7..d3671b3 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
@@ -22,6 +22,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.catalog.store.AbstractDBStore;
@@ -219,4 +222,175 @@ public class TestDBStore {
           s2.getColumn(i).getColumnName());
     }
   }
+
+  @Test
+  public final void testAddAndDeleteTablePartitionByHash1() throws Exception {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4)
+        .addColumn("name", Type.TEXT)
+        .addColumn("age", Type.INT4)
+        .addColumn("score", Type.FLOAT8);
+
+    String tableName = "addedtable";
+    Options opts = new Options();
+    opts.put("file.delimiter", ",");
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+    Partitions partitions = new Partitions();
+    partitions.addColumn(new Column("id", Type.INT4));
+    partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+    partitions.setNumPartitions(2);
+
+    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    desc.setPartitions(partitions);
+    assertFalse(store.existTable(tableName));
+    store.addTable(desc);
+    assertTrue(store.existTable(tableName));
+
+    TableDesc retrieved = store.getTable(tableName);
+
+    // Schema order check
+    assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
+    store.deleteTable(tableName);
+    assertFalse(store.existTable(tableName));
+  }
+
+  @Test
+  public final void testAddAndDeleteTablePartitionByHash2() throws Exception {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4)
+        .addColumn("name", Type.TEXT)
+        .addColumn("age", Type.INT4)
+        .addColumn("score", Type.FLOAT8);
+
+    String tableName = "addedtable";
+    Options opts = new Options();
+    opts.put("file.delimiter", ",");
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+    Partitions partitions = new Partitions();
+    partitions.addColumn(new Column("id", Type.INT4));
+    partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+    partitions.setNumPartitions(2);
+
+    partitions.addSpecifier(new Specifier("sub_part1"));
+    partitions.addSpecifier(new Specifier("sub_part2"));
+    partitions.addSpecifier(new Specifier("sub_part3"));
+
+    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    desc.setPartitions(partitions);
+    assertFalse(store.existTable(tableName));
+    store.addTable(desc);
+    assertTrue(store.existTable(tableName));
+
+    TableDesc retrieved = store.getTable(tableName);
+
+    // Schema order check
+    assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
+    store.deleteTable(tableName);
+    assertFalse(store.existTable(tableName));
+  }
+
+  @Test
+  public final void testAddAndDeleteTablePartitionByList() throws Exception {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4)
+        .addColumn("name", Type.TEXT)
+        .addColumn("age", Type.INT4)
+        .addColumn("score", Type.FLOAT8);
+
+    String tableName = "addedtable";
+    Options opts = new Options();
+    opts.put("file.delimiter", ",");
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+    Partitions partitions = new Partitions();
+    partitions.addColumn(new Column("id", Type.INT4));
+    partitions.setPartitionsType(CatalogProtos.PartitionsType.LIST);
+
+    partitions.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
+    partitions.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+
+    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    desc.setPartitions(partitions);
+    assertFalse(store.existTable(tableName));
+    store.addTable(desc);
+    assertTrue(store.existTable(tableName));
+
+    TableDesc retrieved = store.getTable(tableName);
+
+    // Schema order check
+    assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
+    store.deleteTable(tableName);
+    assertFalse(store.existTable(tableName));
+  }
+
+  @Test
+  public final void testAddAndDeleteTablePartitionByRange() throws Exception {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4)
+        .addColumn("name", Type.TEXT)
+        .addColumn("age", Type.INT4)
+        .addColumn("score", Type.FLOAT8);
+
+    String tableName = "addedtable";
+    Options opts = new Options();
+    opts.put("file.delimiter", ",");
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+    Partitions partitions = new Partitions();
+    partitions.addColumn(new Column("id", Type.INT4));
+    partitions.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
+
+    partitions.addSpecifier(new Specifier("sub_part1", "2"));
+    partitions.addSpecifier(new Specifier("sub_part2", "5"));
+    partitions.addSpecifier(new Specifier("sub_part3"));
+
+    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    desc.setPartitions(partitions);
+    assertFalse(store.existTable(tableName));
+    store.addTable(desc);
+    assertTrue(store.existTable(tableName));
+
+    TableDesc retrieved = store.getTable(tableName);
+
+    // Schema order check
+    assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
+    store.deleteTable(tableName);
+    assertFalse(store.existTable(tableName));
+  }
+
+  @Test
+  public final void testAddAndDeleteTablePartitionByColumn() throws Exception {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4)
+        .addColumn("name", Type.TEXT)
+        .addColumn("age", Type.INT4)
+        .addColumn("score", Type.FLOAT8);
+
+    String tableName = "addedtable";
+    Options opts = new Options();
+    opts.put("file.delimiter", ",");
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
+
+    Partitions partitions = new Partitions();
+    partitions.addColumn(new Column("id", Type.INT4));
+    partitions.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+
+    TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
+    desc.setPartitions(partitions);
+    assertFalse(store.existTable(tableName));
+    store.addTable(desc);
+    assertTrue(store.existTable(tableName));
+
+    TableDesc retrieved = store.getTable(tableName);
+
+    // Schema order check
+    assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
+    store.deleteTable(tableName);
+    assertFalse(store.existTable(tableName));
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
index c94260f..58eabfd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -27,8 +27,10 @@ import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.algebra.CreateTable;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.client.QueryStatus;
 import org.apache.tajo.client.TajoClient;
@@ -463,6 +465,40 @@ public class TajoCli {
       }
       sb.append("\n");
     }
+
+    sb.append("\n");
+    sb.append("Partitions: \n");
+    if (desc.getPartitions() != null) {
+      sb.append("type:").append(desc.getPartitions().getPartitionsType().name()).append("\n");
+      if (desc.getPartitions().getNumPartitions() > 0)
+        sb.append("numbers:").append(desc.getPartitions().getNumPartitions()).append("\n");
+
+      sb.append("columns:").append("\n");
+      for(Column eachColumn: desc.getPartitions().getColumns()) {
+        sb.append("  ");
+        sb.append(eachColumn.getColumnName()).append("\t").append(eachColumn.getDataType().getType());
+        if (eachColumn.getDataType().hasLength()) {
+          sb.append("(").append(eachColumn.getDataType().getLength()).append(")");
+        }
+        sb.append("\n");
+      }
+
+      if (desc.getPartitions().getSpecifiers() != null) {
+        sb.append("specifier:").append("\n");
+        for(Specifier specifier :desc.getPartitions().getSpecifiers()) {
+          sb.append("  ");
+          sb.append("name:").append(specifier.getName());
+          if (!specifier.getExpressions().equals("")) {
+            sb.append(", expressions:").append(specifier.getExpressions());
+          } else {
+            if (desc.getPartitions().getPartitionsType().name().equals(CreateTable.PartitionType.RANGE))
+              sb.append(" expressions: MAXVALUE");
+          }
+          sb.append("\n");
+        }
+      }
+    }
+
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 4a305ae..5bba89b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -222,6 +222,18 @@ public class LogicalPlan {
         return ensureUniqueColumn(candidates);
       }
 
+      // Trying to find columns from schema in current block.
+      if (block.getSchema() != null) {
+        Column found = block.getSchema().getColumnByName(columnRef.getName());
+        if (found != null) {
+          candidates.add(found);
+        }
+      }
+
+      if (!candidates.isEmpty()) {
+        return ensureUniqueColumn(candidates);
+      }
+
       throw new VerifyException("ERROR: no such a column name "+ columnRef.getCanonicalName());
     }
   }
@@ -724,7 +736,6 @@ public class LogicalPlan {
         // add target to list if a target can be evaluated at this node
         List<Integer> newEvaluatedTargetIds = new ArrayList<Integer>();
         for (int i = 0; i < getTargetListNum(); i++) {
-
           if (getTarget(i) != null && !isTargetResolved(i)) {
             EvalNode expr = getTarget(i).getEvalTree();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 79f02d1..b5f8d2f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.algebra.CreateTable.ColumnDefinition;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.DataType;
@@ -46,6 +48,7 @@ import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.exception.InternalException;
+import org.apache.tajo.util.TUtil;
 
 import java.util.List;
 import java.util.Stack;
@@ -732,7 +735,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   public LogicalNode visitCreateTable(PlanContext context, Stack<OpType> stack, CreateTable expr)
       throws PlanningException {
 
-     String tableName = expr.getTableName();
+    String tableName = expr.getTableName();
 
     if (expr.hasSubQuery()) {
       stack.add(OpType.CreateTable);
@@ -789,10 +792,125 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
         createTableNode.setPath(new Path(expr.getLocation()));
       }
 
+      if (expr.hasPartition()) {
+        createTableNode.setPartitions(convertTableElementsPartition(context, expr));
+      }
       return createTableNode;
     }
   }
 
+  /**
+   * convert table elements into Partition.
+   *
+   * @param context
+   * @param expr
+   * @return
+   * @throws PlanningException
+   */
+  private Partitions convertTableElementsPartition(PlanContext context,
+                                                   CreateTable expr) throws PlanningException {
+    Schema schema = convertTableElementsSchema(expr.getTableElements());
+    Partitions partitions = null;
+    List<Specifier> specifiers = null;
+    if (expr.hasPartition()) {
+      partitions = new Partitions();
+      specifiers = TUtil.newList();
+
+      partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
+          .getPartitionType().name()));
+
+      if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.HASH)) {
+        CreateTable.HashPartition hashPartition = expr.getPartition();
+
+        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+            , hashPartition.getColumns()));
+
+        if (hashPartition.getColumns() != null) {
+          if (hashPartition.getQuantifier() != null) {
+            String quantity = ((LiteralValue)hashPartition.getQuantifier()).getValue();
+            partitions.setNumPartitions(Integer.parseInt(quantity));
+          }
+
+          if (hashPartition.getSpecifiers() != null) {
+            for(CreateTable.PartitionSpecifier eachSpec: hashPartition.getSpecifiers()) {
+              specifiers.add(new Specifier(eachSpec.getName()));
+            }
+          }
+
+          if (specifiers.isEmpty() && partitions.getNumPartitions() > 0) {
+            for (int i = 0; i < partitions.getNumPartitions(); i++) {
+              String partitionName = partitions.getPartitionsType().name() + "_" + expr
+                  .getTableName() + "_" + i;
+              specifiers.add(new Specifier(partitionName));
+            }
+          }
+
+          if (!specifiers.isEmpty())
+            partitions.setSpecifiers(specifiers);
+        }
+      } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.LIST)) {
+        CreateTable.ListPartition listPartition = expr.getPartition();
+
+        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+            , listPartition.getColumns()));
+
+        if (listPartition.getSpecifiers() != null) {
+          StringBuffer sb = new StringBuffer();
+
+          for(CreateTable.ListPartitionSpecifier eachSpec: listPartition.getSpecifiers()) {
+            Specifier specifier = new Specifier(eachSpec.getName());
+            sb.delete(0, sb.length());
+            for(Expr eachExpr : eachSpec.getValueList().getValues()) {
+              context.block.setSchema(schema);
+              EvalNode eval = createEvalTree(context.plan, context.block, eachExpr);
+              if(sb.length() > 1)
+                sb.append(",");
+
+              sb.append(eval.toString());
+            }
+            specifier.setExpressions(sb.toString());
+            specifiers.add(specifier);
+          }
+          if (!specifiers.isEmpty())
+            partitions.setSpecifiers(specifiers);
+        }
+      } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.RANGE)) {
+        CreateTable.RangePartition rangePartition = expr.getPartition();
+
+        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+            , rangePartition.getColumns()));
+
+        if (rangePartition.getSpecifiers() != null) {
+          for(CreateTable.RangePartitionSpecifier eachSpec: rangePartition.getSpecifiers()) {
+            Specifier specifier = new Specifier();
+
+            if (eachSpec.getName() != null)
+              specifier.setName(eachSpec.getName());
+
+            if (eachSpec.getEnd() != null) {
+              context.block.setSchema(schema);
+              EvalNode eval = createEvalTree(context.plan, context.block, eachSpec.getEnd());
+              specifier.setExpressions(eval.toString());
+            }
+
+            if(eachSpec.isEndMaxValue()) {
+              specifier.setExpressions(null);
+            }
+            specifiers.add(specifier);
+          }
+          if (!specifiers.isEmpty())
+            partitions.setSpecifiers(specifiers);
+        }
+      } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.COLUMN)) {
+        CreateTable.ColumnPartition columnPartition = expr.getPartition();
+
+        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+            , columnPartition.getColumns()));
+      }
+    }
+
+    return partitions;
+  }
 
 
   /**
@@ -811,6 +929,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return schema;
   }
 
+  private List<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
+                                                   ColumnReferenceExpr[] references) {
+    List<Column> columnList = TUtil.newList();
+
+    for(CreateTable.ColumnDefinition columnDefinition: elements) {
+      for(ColumnReferenceExpr eachReference: references) {
+        if (columnDefinition.getColumnName().equalsIgnoreCase(eachReference.getName())) {
+          columnList.add(convertColumn(columnDefinition));
+        }
+      }
+    }
+
+    return columnList;
+  }
+
   private DataType convertDataType(org.apache.tajo.algebra.DataType dataType) {
     TajoDataTypes.Type type = TajoDataTypes.Type.valueOf(dataType.getTypeName());
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index 50656c5..942309d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Options;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.partition.Partitions;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
@@ -35,6 +36,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
   @Expose private Path path;
   @Expose private Options options;
   @Expose private boolean external;
+  @Expose private Partitions partitions;
 
   public CreateTableNode(int pid, String tableName, Schema schema) {
     super(pid, NodeType.CREATE_TABLE);
@@ -90,6 +92,17 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     this.external = external;
   }
 
+  public Partitions getPartitions() {
+    return partitions;
+  }
+
+  public void setPartitions(Partitions partitions) {
+    this.partitions = partitions;
+  }
+
+  public boolean hasPartition() {
+    return this.partitions != null;
+  }
 
   @Override
   public PlanString getPlanString() {
@@ -107,7 +120,8 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
           && this.external == other.external
           && TUtil.checkEquals(path, other.path)
           && TUtil.checkEquals(options, other.options)
-          && TUtil.checkEquals(partitionKeys, other.partitionKeys);
+          && TUtil.checkEquals(partitionKeys, other.partitionKeys)
+          && TUtil.checkEquals(partitions, other.partitions);
     } else {
       return false;
     }
@@ -123,6 +137,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     store.path = path != null ? new Path(path.toString()) : null;
     store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
     store.options = (Options) (options != null ? options.clone() : null);
+    store.partitions = (Partitions) (partitions != null ? partitions.clone() : null);
     return store;
   }
   
@@ -142,6 +157,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     sb.append(",\"storeType\": \"" + this.storageType);
     sb.append(",\"path\" : \"" + this.path).append("\",");
     sb.append(",\"external\" : \"" + this.external).append("\",");
+    sb.append(",\"partitions\" : \"" + this.partitions).append("\",");
     
     sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
     .append("\n  \"in schema\": ").append(getInSchema())

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 94447c0..b2bd937 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Options;
+import org.apache.tajo.catalog.partition.Partitions;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
@@ -38,12 +39,19 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
   @Expose private Options options;
   @Expose private boolean isCreatedTable = false;
   @Expose private boolean isOverwritten = false;
+  @Expose private Partitions partitions;
 
   public StoreTableNode(int pid, String tableName) {
     super(pid, NodeType.STORE);
     this.tableName = tableName;
   }
 
+  public StoreTableNode(int pid, String tableName, Partitions partitions) {
+    super(pid, NodeType.STORE);
+    this.tableName = tableName;
+    this.partitions = partitions;
+  }
+
   public final String getTableName() {
     return this.tableName;
   }
@@ -101,6 +109,13 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     return this.options;
   }
 
+  public Partitions getPartitions() {
+    return partitions;
+  }
+
+  public void setPartitions(Partitions partitions) {
+    this.partitions = partitions;
+  }
 
   @Override
   public PlanString getPlanString() {
@@ -131,6 +146,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
       eq = eq && TUtil.checkEquals(options, other.options);
       eq = eq && isCreatedTable == other.isCreatedTable;
       eq = eq && isOverwritten == other.isOverwritten;
+      eq = eq && TUtil.checkEquals(partitions, other.partitions);
       return eq;
     } else {
       return false;
@@ -147,6 +163,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     store.options = options != null ? (Options) options.clone() : null;
     store.isCreatedTable = isCreatedTable;
     store.isOverwritten = isOverwritten;
+    store.partitions = partitions;
     return store;
   }
 
@@ -169,8 +186,13 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     }
     
     sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
-    .append("\n  \"in schema\": ").append(getInSchema())
-    .append("}");
+    .append("\n  \"in schema\": ").append(getInSchema());
+
+    if(partitions != null) {
+      sb.append(partitions.toString());
+    }
+
+    sb.append("}");
     
     return sb.toString() + "\n"
         + getChild().toString();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 505fd71..4f18b11 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -32,6 +32,7 @@ import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
 import org.apache.tajo.catalog.exception.NoSuchTableException;
+import org.apache.tajo.catalog.partition.Partitions;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
@@ -131,7 +132,7 @@ public class GlobalEngine extends AbstractService {
       }
 
       Expr planningContext = hiveQueryMode ? converter.parse(sql) : analyzer.parse(sql);
-      
+
       LogicalPlan plan = createLogicalPlan(planningContext);
       LogicalRootNode rootNode = plan.getRootBlock().getRoot();
 
@@ -253,10 +254,11 @@ public class GlobalEngine extends AbstractService {
     }
 
     return createTableOnDirectory(createTable.getTableName(), createTable.getSchema(), meta,
-        createTable.getPath(), true);
+        createTable.getPath(), true, createTable.getPartitions());
   }
 
-  public TableDesc createTableOnDirectory(String tableName, Schema schema, TableMeta meta, Path path, boolean isCreated)
+  public TableDesc createTableOnDirectory(String tableName, Schema schema, TableMeta meta,
+                                          Path path, boolean isCreated, Partitions partitions)
       throws IOException {
     if (catalog.existsTable(tableName)) {
       throw new AlreadyExistsTableException(tableName);
@@ -284,6 +286,7 @@ public class GlobalEngine extends AbstractService {
     stats.setNumBytes(totalSize);
     TableDesc desc = CatalogUtil.newTableDesc(tableName, schema, meta, path);
     desc.setStats(stats);
+    desc.setPartitions(partitions);
     catalog.addTable(desc);
 
     LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index d1faf4f..03cb4d0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -32,6 +32,7 @@ import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.NoSuchTableException;
+import org.apache.tajo.catalog.partition.Partitions;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.ipc.ClientProtos;
@@ -307,11 +308,12 @@ public class TajoMasterClientService extends AbstractService {
 
         Schema schema = new Schema(request.getSchema());
         TableMeta meta = new TableMeta(request.getMeta());
+        Partitions partitions = new Partitions(request.getPartitions());
 
         TableDesc desc;
         try {
-          desc = context.getGlobalEngine().createTableOnDirectory(request.getName(), schema, meta, path,
-              false);
+          desc = context.getGlobalEngine().createTableOnDirectory(request.getName(), schema,
+              meta, path, false, partitions);
         } catch (Exception e) {
           return TableResponse.newBuilder()
               .setResultCode(ResultCode.ERROR)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
index 89c40c8..dc9c905 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
@@ -124,6 +124,7 @@ message CreateTableRequest {
   required SchemaProto schema = 2;
   required TableProto meta = 3;
   required string path = 4;
+  optional PartitionsProto partitions = 5;
 }
 
 message AttachTableRequest {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index bd62f40..8b06ebf 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -191,4 +191,126 @@ public class TestTajoClient {
     assertEquals(tableName1, desc.getName());
     assertTrue(desc.getStats().getNumBytes() > 0);
   }
+
+  @Test
+  public final void testCreateAndDropTablePartitionedHash1ByExecuteQuery() throws IOException,
+      ServiceException, SQLException {
+    TajoConf conf = cluster.getConfiguration();
+    final String tableName = "testCreateAndDropTablePartitionedHash1";
+
+    assertFalse(client.existTable(tableName));
+
+    String sql = "create table " + tableName + " (deptname text, score int4)";
+    sql += " PARTITION BY HASH (deptname)";
+    sql += " (PARTITION sub_part1, PARTITION sub_part2, PARTITION sub_part3)";
+
+    client.updateQuery(sql);
+    assertTrue(client.existTable(tableName));
+
+    Path tablePath = client.getTableDesc(tableName).getPath();
+    FileSystem hdfs = tablePath.getFileSystem(conf);
+    assertTrue(hdfs.exists(tablePath));
+
+    client.updateQuery("drop table " + tableName);
+    assertFalse(client.existTable(tableName));
+    assertFalse(hdfs.exists(tablePath));
+  }
+
+  @Test
+  public final void testCreateAndDropTablePartitionedHash2ByExecuteQuery() throws IOException,
+      ServiceException, SQLException {
+    TajoConf conf = cluster.getConfiguration();
+    final String tableName = "testCreateAndDropTablePartitionedHash2";
+
+    assertFalse(client.existTable(tableName));
+
+    String sql = "create table " + tableName + " (deptname text, score int4)";
+    sql += "PARTITION BY HASH (deptname)";
+    sql += "PARTITIONS 2";
+
+    client.updateQuery(sql);
+    assertTrue(client.existTable(tableName));
+
+    Path tablePath = client.getTableDesc(tableName).getPath();
+    FileSystem hdfs = tablePath.getFileSystem(conf);
+    assertTrue(hdfs.exists(tablePath));
+
+    client.updateQuery("drop table " + tableName);
+    assertFalse(client.existTable(tableName));
+    assertFalse(hdfs.exists(tablePath));
+  }
+
+  @Test
+  public final void testCreateAndDropTablePartitionedListByExecuteQuery() throws IOException,
+      ServiceException, SQLException {
+    TajoConf conf = cluster.getConfiguration();
+    final String tableName = "testCreateAndDropTablePartitionedList";
+
+    assertFalse(client.existTable(tableName));
+
+    String sql = "create table " + tableName + " (deptname text, score int4)";
+    sql += "PARTITION BY LIST (deptname)";
+    sql += "( PARTITION sub_part1 VALUES('r&d', 'design'),";
+    sql += "PARTITION sub_part2 VALUES('sales', 'hr') )";
+
+    client.updateQuery(sql);
+    assertTrue(client.existTable(tableName));
+
+    Path tablePath = client.getTableDesc(tableName).getPath();
+    FileSystem hdfs = tablePath.getFileSystem(conf);
+    assertTrue(hdfs.exists(tablePath));
+
+    client.updateQuery("drop table " + tableName);
+    assertFalse(client.existTable(tableName));
+    assertFalse(hdfs.exists(tablePath));
+  }
+
+  @Test
+  public final void testCreateAndDropTablePartitionedRangeByExecuteQuery() throws IOException,
+      ServiceException, SQLException {
+    TajoConf conf = cluster.getConfiguration();
+    final String tableName = "testCreateAndDropTablePartitionedRange";
+
+    assertFalse(client.existTable(tableName));
+
+    String sql = "create table " + tableName + " (deptname text, score int4)";
+    sql += "PARTITION BY RANGE (score)";
+    sql += "( PARTITION sub_part1 VALUES LESS THAN (2),";
+    sql += "PARTITION sub_part2 VALUES LESS THAN (5),";
+    sql += "PARTITION sub_part2 VALUES LESS THAN (MAXVALUE) )";
+
+    client.updateQuery(sql);
+    assertTrue(client.existTable(tableName));
+
+    Path tablePath = client.getTableDesc(tableName).getPath();
+    FileSystem hdfs = tablePath.getFileSystem(conf);
+    assertTrue(hdfs.exists(tablePath));
+
+    client.updateQuery("drop table " + tableName);
+    assertFalse(client.existTable(tableName));
+    assertFalse(hdfs.exists(tablePath));
+  }
+  @Test
+  public final void testCreateAndDropTablePartitionedColumnByExecuteQuery() throws IOException,
+      ServiceException, SQLException {
+    TajoConf conf = cluster.getConfiguration();
+    final String tableName = "testCreateAndDropTablePartitionedColumn";
+
+    assertFalse(client.existTable(tableName));
+
+    String sql = "create table " + tableName + " (deptname text, score int4)";
+    sql += "PARTITION BY COLUMN (deptname)";
+
+    client.updateQuery(sql);
+    assertTrue(client.existTable(tableName));
+
+    Path tablePath = client.getTableDesc(tableName).getPath();
+    FileSystem hdfs = tablePath.getFileSystem(conf);
+    assertTrue(hdfs.exists(tablePath));
+
+    client.updateQuery("drop table " + tableName);
+    assertFalse(client.existTable(tableName));
+    assertFalse(hdfs.exists(tablePath));
+  }
+
 }


[2/2] git commit: TAJO-284: Add table partitioning entry to Catalog. (jaehwa)

Posted by bl...@apache.org.
TAJO-284: Add table partitioning entry to Catalog. (jaehwa)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/0b0de13b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/0b0de13b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/0b0de13b

Branch: refs/heads/master
Commit: 0b0de13b2444f9e75ad3e1f42cba51ddb1f86dc2
Parents: 29a0aa0
Author: blrunner <jh...@gruter.com>
Authored: Tue Dec 3 18:29:22 2013 +0900
Committer: blrunner <jh...@gruter.com>
Committed: Tue Dec 3 18:29:22 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../org/apache/tajo/algebra/CreateTable.java    |  11 +-
 .../apache/tajo/catalog/CatalogConstants.java   |   1 +
 .../org/apache/tajo/catalog/CatalogUtil.java    |   3 +
 .../org/apache/tajo/catalog/DDLBuilder.java     |  70 +++
 .../java/org/apache/tajo/catalog/Schema.java    |   2 +-
 .../java/org/apache/tajo/catalog/TableDesc.java |  33 +-
 .../tajo/catalog/partition/Partitions.java      | 349 +++++++++++++
 .../tajo/catalog/partition/Specifier.java       | 128 +++++
 .../src/main/proto/CatalogProtos.proto          |  20 +
 tajo-catalog/tajo-catalog-server/pom.xml        |   4 +
 .../org/apache/tajo/catalog/CatalogServer.java  |   5 +-
 .../tajo/catalog/store/AbstractDBStore.java     | 184 ++++++-
 .../apache/tajo/catalog/store/DerbyStore.java   | 488 +++++++++++++------
 .../apache/tajo/catalog/store/MySQLStore.java   | 204 +++++---
 .../org/apache/tajo/catalog/TestCatalog.java    | 225 ++++++++-
 .../org/apache/tajo/catalog/TestDBStore.java    | 174 +++++++
 .../main/java/org/apache/tajo/cli/TajoCli.java  |  36 ++
 .../apache/tajo/engine/planner/LogicalPlan.java |  13 +-
 .../tajo/engine/planner/LogicalPlanner.java     | 135 ++++-
 .../engine/planner/logical/CreateTableNode.java |  18 +-
 .../engine/planner/logical/StoreTableNode.java  |  26 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   9 +-
 .../tajo/master/TajoMasterClientService.java    |   6 +-
 .../src/main/proto/ClientProtos.proto           |   1 +
 .../org/apache/tajo/client/TestTajoClient.java  | 122 +++++
 26 files changed, 2022 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ab0e893..8c53d9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -46,6 +46,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-284: Add table partitioning entry to Catalog. (jaehwa)
+
     TAJO-317: Improve TajoResourceManager to support more elaborate resource 
     management. (Keuntae Park via jihoon)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
index 6e36f3a..41276ad 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
@@ -279,7 +279,6 @@ public class CreateTable extends Expr {
   }
 
   public static class RangePartitionSpecifier extends PartitionSpecifier {
-    String name;
     Expr end;
     boolean maxValue;
 
@@ -293,10 +292,6 @@ public class CreateTable extends Expr {
       maxValue = true;
     }
 
-    public String getName() {
-      return name;
-    }
-
     public Expr getEnd() {
       return end;
     }
@@ -320,10 +315,14 @@ public class CreateTable extends Expr {
   }
 
   public static class PartitionSpecifier {
-    String name;
+    private String name;
 
     public PartitionSpecifier(String name) {
       this.name = name;
     }
+
+    public String getName() {
+      return this.name;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index 4b1f794..ed23b08 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -35,6 +35,7 @@ public class CatalogConstants {
   public static final String TB_OPTIONS = "OPTIONS";
   public static final String TB_INDEXES = "INDEXES";
   public static final String TB_STATISTICS = "STATS";
+  public static final String TB_PARTTIONS = "PARTITIONS";
   public static final String C_TABLE_ID = "TABLE_ID";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index 6b4848c..dc91035 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -27,6 +27,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.Statement;
+import java.sql.PreparedStatement;
 import java.sql.Wrapper;
 import java.util.Collection;
 
@@ -148,6 +149,8 @@ public class CatalogUtil {
       try{
         if(w instanceof Statement){
           ((Statement)w).close();
+        } else if(w instanceof PreparedStatement){
+            ((PreparedStatement)w).close();
         } else if(w instanceof ResultSet){
           ((ResultSet)w).close();
         } else if(w instanceof Connection){

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
index e6cc46d..a9d0f03 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
@@ -18,6 +18,9 @@
 
 package org.apache.tajo.catalog;
 
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
 
 import java.util.Map;
@@ -38,6 +41,10 @@ public class DDLBuilder {
     buildWithClause(sb, desc.getMeta());
     buildLocationClause(sb, desc);
 
+    if (desc.getPartitions() != null) {
+      buildPartitionClause(sb, desc);
+    }
+
     sb.append(";");
     return sb.toString();
   }
@@ -87,4 +94,67 @@ public class DDLBuilder {
   private static void buildLocationClause(StringBuilder sb, TableDesc desc) {
     sb.append(" LOCATION '").append(desc.getPath()).append("'");
   }
+
+  private static void buildPartitionClause(StringBuilder sb, TableDesc desc) {
+    Partitions partitions = desc.getPartitions();
+
+    sb.append(" PARTITION BY ");
+    sb.append(partitions.getPartitionsType().name());
+
+    // columns
+    sb.append("(");
+    int columnCount = 0;
+    for(Column column: partitions.getColumns()) {
+      for(Column targetColumn: desc.getSchema().getColumns()) {
+        if (column.getColumnName().equals(targetColumn.getColumnName()))  {
+          if (columnCount > 0)
+            sb.append(",");
+
+          sb.append(column.getColumnName());
+          columnCount++;
+        }
+      }
+    }
+    sb.append(")");
+
+    // specifier
+    if (partitions.getSpecifiers() != null
+        && !partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.COLUMN)) {
+
+      sb.append(" (");
+      for(int i = 0; i < partitions.getSpecifiers().size(); i++) {
+        Specifier specifier = partitions.getSpecifiers().get(i);
+        if (i > 0)
+          sb.append(",");
+
+        sb.append(" PARTITION");
+
+        if (!specifier.getName().isEmpty())
+          sb.append(" ").append(specifier.getName());
+
+        if (partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.LIST)) {
+          if (!specifier.getExpressions().isEmpty()) {
+            sb.append(" VALUES (");
+            String[] expressions = specifier.getExpressions().split("\\,");
+            for(int j = 0; j < expressions.length; j++) {
+              if (j > 0)
+                sb.append(",");
+              sb.append("'").append(expressions[j]).append("'");
+            }
+            sb.append(")");
+
+          }
+        } else if (partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.RANGE))  {
+          sb.append(" VALUES LESS THAN (");
+          if (!specifier.getExpressions().isEmpty()) {
+            sb.append(specifier.getExpressions());
+          } else {
+            sb.append("MAXVALUE");
+          }
+          sb.append(")");
+        }
+      }
+      sb.append(")");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 7c0de81..8a2d028 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -212,7 +212,7 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
     }
   }
 
-	@Override
+  @Override
 	public boolean equals(Object o) {
 		if (o instanceof Schema) {
 		  Schema other = (Schema) o;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index f59feef..458a99a 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -21,8 +21,11 @@ package org.apache.tajo.catalog;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.annotations.Expose;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.partition.Partitions;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -30,6 +33,8 @@ import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.json.GsonObject;
 
 public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Cloneable {
+  private final Log LOG = LogFactory.getLog(TableDesc.class);
+
   protected TableDescProto.Builder builder = null;
   
 	@Expose protected String tableName; // required
@@ -37,6 +42,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
   @Expose protected TableMeta meta; // required
   @Expose protected Path uri; // required
   @Expose	protected TableStats stats; // optional
+  @Expose protected Partitions partitions; //optional
   
 	public TableDesc() {
 		builder = TableDescProto.newBuilder();
@@ -48,7 +54,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
 	  this.tableName = tableName.toLowerCase();
     this.schema = schema;
 	  this.meta = info;
-	  this.uri = path;	   
+	  this.uri = path;
 	}
 	
 	public TableDesc(String tableName, Schema schema, StoreType type, Options options, Path path) {
@@ -58,6 +64,9 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
 	public TableDesc(TableDescProto proto) {
 	  this(proto.getId(), new Schema(proto.getSchema()), new TableMeta(proto.getMeta()), new Path(proto.getPath()));
     this.stats = new TableStats(proto.getStats());
+    if (proto.getPartitions() != null && !proto.getPartitions().toString().isEmpty()) {
+      this.partitions = new Partitions(proto.getPartitions());
+    }
 	}
 	
 	public void setName(String tableId) {
@@ -104,8 +113,20 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
   public TableStats getStats() {
     return this.stats;
   }
-	
-	public boolean equals(Object object) {
+
+  public boolean hasPartitions() {
+    return this.partitions != null;
+  }
+
+  public Partitions getPartitions() {
+    return partitions;
+  }
+
+  public void setPartitions(Partitions partitions) {
+    this.partitions = partitions;
+  }
+
+  public boolean equals(Object object) {
     if(object instanceof TableDesc) {
       TableDesc other = (TableDesc) object;
       
@@ -123,6 +144,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
     desc.meta = (TableMeta) meta.clone();
     desc.uri = uri;
     desc.stats = stats != null ? (TableStats) stats.clone() : null;
+    desc.partitions = partitions != null ? (Partitions) partitions.clone() : null;
 	  
 	  return desc;
 	}
@@ -154,7 +176,10 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
       builder.setPath(this.uri.toString());
     }
     if (this.stats != null) {
-      builder.setStats(stats.getProto());
+      builder.setStats(this.stats.getProto());
+    }
+    if (this.partitions != null) {
+      builder.setPartitions(this.partitions.getProto());
     }
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
new file mode 100644
index 0000000..c82f0cb
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.partition;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.exception.AlreadyExistsFieldException;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+import java.util.*;
+
+public class Partitions implements ProtoObject<CatalogProtos.PartitionsProto>, Cloneable, GsonObject {
+
+  private static final Log LOG = LogFactory.getLog(Partitions.class);
+
+  @Expose protected CatalogProtos.PartitionsType partitionsType; //required
+  @Expose protected List<Column> columns; //required
+  @Expose protected int numPartitions; //optional
+  @Expose protected List<Specifier> specifiers; //optional
+  @Expose protected Map<String, Integer> columnsByQialifiedName = null;
+  @Expose protected Map<String, List<Integer>> columnsByName = null;
+
+  private CatalogProtos.PartitionsProto.Builder builder = CatalogProtos.PartitionsProto.newBuilder();
+
+  public Partitions() {
+    this.columns = new ArrayList<Column>();
+    this.columnsByQialifiedName = new TreeMap<String, Integer>();
+    this.columnsByName = new HashMap<String, List<Integer>>();
+  }
+
+  public Partitions(Partitions partition) {
+    this();
+    this.partitionsType = partition.partitionsType;
+    this.columns.addAll(partition.columns);
+    this.columnsByQialifiedName.putAll(partition.columnsByQialifiedName);
+    this.columnsByName.putAll(partition.columnsByName);
+    this.numPartitions = partition.numPartitions;
+    this.specifiers = partition.specifiers;
+  }
+
+  public Partitions(CatalogProtos.PartitionsType partitionsType, Column[] columns, int numPartitions,
+                   List<Specifier> specifiers) {
+    this();
+    this.partitionsType = partitionsType;
+    for (Column c : columns) {
+      addColumn(c);
+    }
+    this.numPartitions = numPartitions;
+    this.specifiers = specifiers;
+  }
+
+  public Partitions(CatalogProtos.PartitionsProto proto) {
+    this.partitionsType = proto.getPartitionsType();
+    this.columns = new ArrayList<Column>();
+    this.columnsByQialifiedName = new HashMap<String, Integer>();
+    this.columnsByName = new HashMap<String, List<Integer>>();
+    for (CatalogProtos.ColumnProto colProto : proto.getColumnsList()) {
+      Column tobeAdded = new Column(colProto);
+      columns.add(tobeAdded);
+      if (tobeAdded.hasQualifier()) {
+        columnsByQialifiedName.put(tobeAdded.getQualifier() + "." + tobeAdded.getColumnName(),
+            columns.size() - 1);
+      } else {
+        columnsByQialifiedName.put(tobeAdded.getColumnName(), columns.size() - 1);
+      }
+      if (columnsByName.containsKey(tobeAdded.getColumnName())) {
+        columnsByName.get(tobeAdded.getColumnName()).add(columns.size() - 1);
+      } else {
+        columnsByName.put(tobeAdded.getColumnName(), TUtil.newList(columns.size() - 1));
+      }
+    }
+    this.numPartitions = proto.getNumPartitions();
+    if(proto.getSpecifiersList() != null) {
+      this.specifiers = TUtil.newList();
+      for(CatalogProtos.SpecifierProto specifier: proto.getSpecifiersList()) {
+        this.specifiers.add(new Specifier(specifier));
+      }
+    }
+  }
+
+  /**
+   * Set a qualifier to this schema.
+   * This changes the qualifier of all columns except for not-qualified columns.
+   *
+   * @param qualifier The qualifier
+   */
+  public void setQualifier(String qualifier) {
+    setQualifier(qualifier, false);
+  }
+
+  /**
+   * Set a qualifier to this schema. This changes the qualifier of all columns if force is true.
+   * Otherwise, it changes the qualifier of all columns except for non-qualified columns
+   *
+   * @param qualifier The qualifier
+   * @param force     If true, all columns' qualifiers will be changed. Otherwise,
+   *                  only qualified columns' qualifiers will
+   *                  be changed.
+   */
+  public void setQualifier(String qualifier, boolean force) {
+    columnsByQialifiedName.clear();
+
+    for (int i = 0; i < getColumnNum(); i++) {
+      if (!force && columns.get(i).hasQualifier()) {
+        continue;
+      }
+      columns.get(i).setQualifier(qualifier);
+      columnsByQialifiedName.put(columns.get(i).getQualifiedName(), i);
+    }
+  }
+
+  public int getColumnNum() {
+    return this.columns.size();
+  }
+
+  public Column getColumn(int id) {
+    return columns.get(id);
+  }
+
+  public Column getColumnByFQN(String qualifiedName) {
+    Integer cid = columnsByQialifiedName.get(qualifiedName.toLowerCase());
+    return cid != null ? columns.get(cid) : null;
+  }
+
+  public Column getColumnByName(String colName) {
+    String normalized = colName.toLowerCase();
+    List<Integer> list = columnsByName.get(normalized);
+
+    if (list == null || list.size() == 0) {
+      return null;
+    }
+
+    if (list.size() == 1) {
+      return columns.get(list.get(0));
+    } else {
+      StringBuilder sb = new StringBuilder();
+      boolean first = true;
+      for (Integer id : list) {
+        if (first) {
+          first = false;
+        } else {
+          sb.append(", ");
+        }
+        sb.append(columns.get(id));
+      }
+      throw new RuntimeException("Ambiguous Column Name: " + sb.toString());
+    }
+  }
+
+  public int getColumnId(String qualifiedName) {
+    return columnsByQialifiedName.get(qualifiedName.toLowerCase());
+  }
+
+  public int getColumnIdByName(String colName) {
+    for (Column col : columns) {
+      if (col.getColumnName().equals(colName.toLowerCase())) {
+        return columnsByQialifiedName.get(col.getQualifiedName());
+      }
+    }
+    return -1;
+  }
+
+  public List<Column> getColumns() {
+    return ImmutableList.copyOf(columns);
+  }
+
+  public void setColumns(List<Column> columns) {
+    this.columns = columns;
+  }
+
+  public boolean contains(String colName) {
+    return columnsByQialifiedName.containsKey(colName.toLowerCase());
+
+  }
+
+  public boolean containsAll(Collection<Column> columns) {
+    return columns.containsAll(columns);
+  }
+
+  public synchronized Partitions addColumn(String name, TajoDataTypes.Type type) {
+    if (type == TajoDataTypes.Type.CHAR) {
+      return addColumn(name, CatalogUtil.newDataTypeWithLen(type, 1));
+    }
+    return addColumn(name, CatalogUtil.newSimpleDataType(type));
+  }
+
+  public synchronized Partitions addColumn(String name, TajoDataTypes.Type type, int length) {
+    return addColumn(name, CatalogUtil.newDataTypeWithLen(type, length));
+  }
+
+  public synchronized Partitions addColumn(String name, TajoDataTypes.DataType dataType) {
+    String normalized = name.toLowerCase();
+    if (columnsByQialifiedName.containsKey(normalized)) {
+      LOG.error("Already exists column " + normalized);
+      throw new AlreadyExistsFieldException(normalized);
+    }
+
+    Column newCol = new Column(normalized, dataType);
+    columns.add(newCol);
+    columnsByQialifiedName.put(newCol.getQualifiedName(), columns.size() - 1);
+    columnsByName.put(newCol.getColumnName(), TUtil.newList(columns.size() - 1));
+
+    return this;
+  }
+
+  public synchronized void addColumn(Column column) {
+    addColumn(column.getQualifiedName(), column.getDataType());
+  }
+
+  public synchronized void addColumns(Partitions schema) {
+    for (Column column : schema.getColumns()) {
+      addColumn(column);
+    }
+  }
+
+  public synchronized void addSpecifier(Specifier specifier) {
+    if(specifiers == null)
+      specifiers = TUtil.newList();
+
+    specifiers.add(specifier);
+  }
+
+  public CatalogProtos.PartitionsType getPartitionsType() {
+    return partitionsType;
+  }
+
+  public void setPartitionsType(CatalogProtos.PartitionsType partitionsType) {
+    this.partitionsType = partitionsType;
+  }
+
+  public int getNumPartitions() {
+    return numPartitions;
+  }
+
+  public void setNumPartitions(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
+  public List<Specifier> getSpecifiers() {
+    return specifiers;
+  }
+
+  public void setSpecifiers(List<Specifier> specifiers) {
+    this.specifiers = specifiers;
+  }
+
+  public Map<String, Integer> getColumnsByQialifiedName() {
+    return columnsByQialifiedName;
+  }
+
+  public void setColumnsByQialifiedName(Map<String, Integer> columnsByQialifiedName) {
+    this.columnsByQialifiedName = columnsByQialifiedName;
+  }
+
+  public Map<String, List<Integer>> getColumnsByName() {
+    return columnsByName;
+  }
+
+  public void setColumnsByName(Map<String, List<Integer>> columnsByName) {
+    this.columnsByName = columnsByName;
+  }
+
+  public boolean equals(Object o) {
+    if (o instanceof Partitions) {
+      Partitions other = (Partitions) o;
+      return getProto().equals(other.getProto());
+    }
+    return false;
+  }
+
+  public Object clone() throws CloneNotSupportedException {
+    Partitions clone = (Partitions) super.clone();
+    clone.builder = CatalogProtos.PartitionsProto.newBuilder();
+    clone.setPartitionsType(this.partitionsType);
+    clone.setColumns(this.columns);
+    clone.setNumPartitions(this.numPartitions);
+    clone.specifiers = new ArrayList<Specifier>(this.specifiers);
+
+    return clone;
+  }
+
+  @Override
+  public CatalogProtos.PartitionsProto getProto() {
+    if (builder == null) {
+      builder = CatalogProtos.PartitionsProto.newBuilder();
+    }
+    if (this.partitionsType != null) {
+      builder.setPartitionsType(this.partitionsType);
+    }
+    builder.clearColumns();
+    if (this.columns != null) {
+      for (Column col : columns) {
+        builder.addColumns(col.getProto());
+      }
+    }
+    builder.setNumPartitions(numPartitions);
+
+    if (this.specifiers != null) {
+      for(Specifier specifier: specifiers) {
+        builder.addSpecifiers(specifier.getProto());
+      }
+    }
+    return builder.build();
+  }
+
+  public String toString() {
+    Gson gson = new GsonBuilder().setPrettyPrinting().
+        excludeFieldsWithoutExposeAnnotation().create();
+    return gson.toJson(this);
+  }
+
+  @Override
+  public String toJson() {
+    return CatalogGsonHelper.toJson(this, Partitions.class);
+
+  }
+
+  public Column[] toArray() {
+    return this.columns.toArray(new Column[this.columns.size()]);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java
new file mode 100644
index 0000000..feb8a33
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Specifier.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.partition;
+
+import com.google.common.base.Objects;
+import com.google.gson.Gson;
+import com.google.gson.annotations.Expose;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+public class Specifier implements ProtoObject<CatalogProtos.SpecifierProto>, Cloneable,
+    GsonObject {
+
+  private static final Log LOG = LogFactory.getLog(Specifier.class);
+  protected CatalogProtos.SpecifierProto.Builder builder = null;
+
+
+  @Expose protected String name;
+  @Expose protected String expressions;
+
+  public Specifier() {
+    builder = CatalogProtos.SpecifierProto.newBuilder();
+  }
+
+  public Specifier(String name) {
+    this();
+    this.name = name;
+  }
+
+  public Specifier(String name, String expressions) {
+    this();
+    this.name = name;
+    this.expressions = expressions;
+  }
+
+  public Specifier(CatalogProtos.SpecifierProto proto) {
+    this();
+    this.name = proto.getName().toLowerCase();
+    this.expressions = proto.getExpressions();
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  public String getExpressions() {
+    return expressions;
+  }
+
+  public void setExpressions(String expressions) {
+    this.expressions = expressions;
+  }
+
+  public boolean equals(Object o) {
+    if (o instanceof Specifier) {
+      Specifier other = (Specifier)o;
+      boolean eq = TUtil.checkEquals(this.name, other.name);
+      eq = eq && TUtil.checkEquals(this.expressions, other.expressions);
+      return  eq;
+    }
+    return false;
+  }
+
+  public int hashCode() {
+    return Objects.hashCode(this.name, this.expressions);
+
+  }
+
+  public Object clone() throws CloneNotSupportedException {
+    Specifier clone = (Specifier) super.clone();
+    clone.builder = CatalogProtos.SpecifierProto.newBuilder();
+    clone.name = this.name;
+    clone.expressions = this.expressions;
+    return clone;
+  }
+
+  public String toString() {
+    Gson gson = CatalogGsonHelper.getPrettyInstance();
+    return gson.toJson(this);
+  }
+
+  @Override
+  public CatalogProtos.SpecifierProto getProto() {
+    if(builder == null) {
+      builder = CatalogProtos.SpecifierProto.newBuilder();
+    }
+
+    if(this.name != null) {
+      builder.setName(this.name);
+    }
+
+    if(this.expressions != null) {
+      builder.setExpressions(this.expressions);
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public String toJson() {
+    return CatalogGsonHelper.toJson(this, Specifier.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index acfa4fb..e5af491 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -50,6 +50,13 @@ enum CompressType {
     LZ = 6;
 }
 
+enum PartitionsType {
+    RANGE = 0;
+    HASH = 1;
+    LIST = 2;
+    COLUMN = 3;
+}
+
 message ColumnMetaProto {
     required DataType dataType = 1;
     required bool compressed = 2;
@@ -106,6 +113,7 @@ message TableDescProto {
 	required TableProto meta = 3;
 	required SchemaProto schema = 4;
 	optional TableStatsProto stats = 5;
+	optional PartitionsProto partitions = 6;
 }
 
 enum FunctionType {
@@ -225,3 +233,15 @@ message SortSpecProto {
   optional bool ascending = 2 [default = true];
   optional bool nullFirst = 3 [default = false];
 }
+
+message PartitionsProto {
+  required PartitionsType partitionsType = 1;
+  repeated ColumnProto columns = 2;
+  optional int32 numPartitions = 3;
+  repeated SpecifierProto specifiers = 4;
+}
+
+message SpecifierProto {
+	optional string name = 1;
+	optional string expressions = 2;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/pom.xml b/tajo-catalog/tajo-catalog-server/pom.xml
index ca8a3ef..e1105eb 100644
--- a/tajo-catalog/tajo-catalog-server/pom.xml
+++ b/tajo-catalog/tajo-catalog-server/pom.xml
@@ -127,6 +127,10 @@
       <groupId>org.apache.tajo</groupId>
       <artifactId>tajo-rpc</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-algebra</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>com.google.protobuf</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index dc279bc..e6566af 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -253,9 +253,12 @@ public class CatalogServer extends AbstractService {
         descBuilder.setMeta(tableDesc.getMeta());
         descBuilder.setSchema(tableDesc.getSchema());
 
+        if( tableDesc.getPartitions() != null
+            && !tableDesc.getPartitions().toString().isEmpty()) {
+          descBuilder.setPartitions(tableDesc.getPartitions());
+        }
 
         store.addTable(new TableDesc(descBuilder.build()));
-
       } catch (IOException ioe) {
         LOG.error(ioe.getMessage(), ioe);
         return BOOL_FALSE;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 6a76794..3414e83 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -26,6 +26,9 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
@@ -37,6 +40,8 @@ import org.apache.tajo.exception.InternalException;
 import java.io.IOException;
 import java.sql.*;
 import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map.Entry;
 
@@ -47,6 +52,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
   protected String connectionPassword;
   protected String catalogUri;
   private Connection conn;
+  protected Map<String, Boolean> baseTableMaps = new HashMap<String, Boolean>();
 
   protected static final int VERSION = 1;
 
@@ -202,6 +208,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
   @Override
   public void addTable(final TableDesc table) throws IOException {
     Statement stmt = null;
+    PreparedStatement pstmt = null;
+
     ResultSet res;
 
     String sql =
@@ -245,7 +253,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
 
 
       String optSql = String.format("INSERT INTO %s (%s, key_, value_) VALUES(?, ?, ?)", TB_OPTIONS, C_TABLE_ID);
-      PreparedStatement pstmt = getConnection().prepareStatement(optSql);
+      pstmt = getConnection().prepareStatement(optSql);
       try {
         for (Entry<String, String> entry : table.getMeta().toMap().entrySet()) {
           pstmt.setString(1, table.getName());
@@ -272,9 +280,94 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
         stmt.addBatch(sql);
         stmt.executeBatch();
       }
+
+      //Partition
+      if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
+        try {
+          Partitions partitions = table.getPartitions();
+          List<Column> columnList = partitions.getColumns();
+
+          // Find columns which used for a partitioned table.
+          StringBuffer columns = new StringBuffer();
+          for(Column eachColumn : columnList) {
+            sql = "SELECT column_id from " + TB_COLUMNS + " WHERE TID "
+                + " = " + tid + " AND column_name = '" + eachColumn.getColumnName() + "'";
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(sql);
+            }
+            res = stmt.executeQuery(sql);
+            if (!res.next()) {
+              throw new IOException("ERROR: there is no columnId matched to "
+                  + table.getName());
+            }
+            columnId = res.getInt("column_id");
+
+            if (columns.length() > 0) {
+              columns.append(",");
+            }
+            columns.append(columnId);
+          }
+
+          // Set default partition name. But if user named to subpartition, it would be updated.
+//          String partitionName = partitions.getPartitionsType().name() + "_" + table.getName();
+
+          sql = "INSERT INTO " + TB_PARTTIONS + " (name, TID, "
+              + " type, quantity, columns, expressions) VALUES (?, ?, ?, ?, ?, ?) ";
+          pstmt = getConnection().prepareStatement(sql);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(sql);
+          }
+
+          // Find information for subpartitions
+          if (partitions.getSpecifiers() != null) {
+            int count = 1;
+            if (partitions.getSpecifiers().size() == 0) {
+              pstmt.clearParameters();
+              pstmt.setString(1, null);
+              pstmt.setInt(2, tid);
+              pstmt.setString(3, partitions.getPartitionsType().name());
+              pstmt.setInt(4, partitions.getNumPartitions());
+              pstmt.setString(5, columns.toString());
+              pstmt.setString(6, null);
+              pstmt.addBatch();
+            } else {
+              for(Specifier specifier: partitions.getSpecifiers()) {
+                pstmt.clearParameters();
+                if (specifier.getName() != null && !specifier.getName().equals("")) {
+                  pstmt.setString(1, specifier.getName());
+                } else {
+                  pstmt.setString(1, null);
+                }
+                pstmt.setInt(2, tid);
+                pstmt.setString(3, partitions.getPartitionsType().name());
+                pstmt.setInt(4, partitions.getNumPartitions());
+                pstmt.setString(5, columns.toString());
+                pstmt.setString(6, specifier.getExpressions());
+                pstmt.addBatch();
+                count++;
+              }
+            }
+          } else {
+            pstmt.clearParameters();
+            pstmt.setString(1, null);
+            pstmt.setInt(2, tid);
+            pstmt.setString(3, partitions.getPartitionsType().name());
+            pstmt.setInt(4, partitions.getNumPartitions());
+            pstmt.setString(5, columns.toString());
+            pstmt.setString(6, null);
+            pstmt.addBatch();
+          }
+          pstmt.executeBatch();
+        } finally {
+          CatalogUtil.closeSQLWrapper(pstmt);
+        }
+      }
+
     } catch (SQLException se) {
       throw new IOException(se.getMessage(), se);
     } finally {
+      CatalogUtil.closeSQLWrapper(pstmt);
       CatalogUtil.closeSQLWrapper(stmt);
     }
   }
@@ -365,6 +458,20 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       CatalogUtil.closeSQLWrapper(stmt);
     }
 
+
+    try {
+      sql = "DELETE FROM " + TB_PARTTIONS + " WHERE TID IN ("
+        + " SELECT TID FROM " + TB_TABLES
+        + " WHERE " + C_TABLE_ID + " = '" + name + "' )";
+      LOG.info(sql);
+      stmt = getConnection().createStatement();
+      stmt.execute(sql);
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(stmt);
+    }
+
     try {
       sql = "DELETE FROM " + TB_TABLES +
           " WHERE " + C_TABLE_ID + " = '" + name + "'";
@@ -376,6 +483,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     } finally {
       CatalogUtil.closeSQLWrapper(stmt);
     }
+
   }
 
   @Override
@@ -388,10 +496,12 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     StoreType storeType = null;
     Options options;
     TableStats stat = null;
+    Partitions partitions = null;
+    int tid = 0;
 
     try {
       String sql =
-          "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+          "SELECT " + C_TABLE_ID + ", path, store_type, TID from " + TB_TABLES
               + " WHERE " + C_TABLE_ID + "='" + name + "'";
       if (LOG.isDebugEnabled()) {
         LOG.debug(sql);
@@ -404,6 +514,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       tableName = res.getString(C_TABLE_ID).trim();
       path = new Path(res.getString("path").trim());
       storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+      tid = res.getInt("TID");
     } catch (SQLException se) {
       throw new IOException(se);
     } finally {
@@ -481,15 +592,84 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       CatalogUtil.closeSQLWrapper(res, stmt);
     }
 
+    try {
+      String sql = "SELECT name, type, quantity, columns, expressions from " + TB_PARTTIONS
+          + " WHERE TID =" + tid + "";
+      stmt = getConnection().createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+
+      while (res.next()) {
+        if (partitions == null) {
+          partitions = new Partitions();
+          String[] columns = res.getString("columns").split(",");
+          for(String eachColumn: columns) {
+            partitions.addColumn(getColumn(tableName, tid, eachColumn));
+          }
+          partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString("type")));
+          partitions.setNumPartitions(res.getInt("quantity"));
+        }
+
+        Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
+        partitions.addSpecifier(specifier);
+      }
+
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+
     TableMeta meta = new TableMeta(storeType, options);
     TableDesc table = new TableDesc(tableName, schema, meta, path);
     if (stat != null) {
       table.setStats(stat);
     }
 
+    if (partitions != null) {
+      table.setPartitions(partitions);
+    }
+
     return table;
   }
 
+  private Column getColumn(String tableName, int tid, String columnId) throws IOException {
+    ResultSet res = null;
+    Column column = null;
+    Statement stmt = null;
+
+    try {
+      String sql = "SELECT column_name, data_type, type_length from "
+          + TB_COLUMNS + " WHERE TID = " + tid + " AND column_id = " + columnId;
+
+      stmt = getConnection().createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+
+      if (res.next()) {
+        String columnName = tableName + "."
+            + res.getString("column_name").trim();
+        Type dataType = getDataType(res.getString("data_type")
+            .trim());
+        int typeLength = res.getInt("type_length");
+        if (typeLength > 0) {
+          column = new Column(columnName, dataType, typeLength);
+        } else {
+          column = new Column(columnName, dataType);
+        }
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    return column;
+  }
+
   private Type getDataType(final String typeStr) {
     try {
       return Enum.valueOf(Type.class, typeStr);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
index 6fee78b..06a701b 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -24,6 +24,9 @@ package org.apache.tajo.catalog.store;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
@@ -38,6 +41,7 @@ import java.sql.*;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -68,143 +72,177 @@ public class DerbyStore extends AbstractDBStore {
     try {
       // META
       stmt = getConnection().createStatement();
-      String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(meta_ddl);
+
+      if (!baseTableMaps.get(TB_META)) {
+        String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(meta_ddl);
+        }
+        stmt.executeUpdate(meta_ddl);
+        LOG.info("Table '" + TB_META + " is created.");
       }
-      stmt.executeUpdate(meta_ddl);
-      LOG.info("Table '" + TB_META + " is created.");
 
       // TABLES
-      String tables_ddl = "CREATE TABLE "
-          + TB_TABLES + " ("
-          + "TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, "
-          + "path VARCHAR(1024), "
-          + "store_type CHAR(16), "
-          + "CONSTRAINT TABLES_PK PRIMARY KEY (TID)" +
-          ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(tables_ddl);
-      }
-      stmt.addBatch(tables_ddl);
-      String idx_tables_tid = 
-          "CREATE UNIQUE INDEX idx_tables_tid on " + TB_TABLES + " (TID)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_tables_tid);
-      }
-      stmt.addBatch(idx_tables_tid);
-      
-      String idx_tables_name = "CREATE UNIQUE INDEX idx_tables_name on " 
-          + TB_TABLES + "(" + C_TABLE_ID + ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_tables_name);
+      if (!baseTableMaps.get(TB_TABLES)) {
+        String tables_ddl = "CREATE TABLE "
+            + TB_TABLES + " ("
+            + "TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, "
+            + "path VARCHAR(1024), "
+            + "store_type CHAR(16), "
+            + "CONSTRAINT TABLES_PK PRIMARY KEY (TID)" +
+            ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(tables_ddl);
+        }
+        stmt.addBatch(tables_ddl);
+
+        String idx_tables_tid =
+            "CREATE UNIQUE INDEX idx_tables_tid on " + TB_TABLES + " (TID)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_tables_tid);
+        }
+        stmt.addBatch(idx_tables_tid);
+
+        String idx_tables_name = "CREATE UNIQUE INDEX idx_tables_name on "
+            + TB_TABLES + "(" + C_TABLE_ID + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_tables_name);
+        }
+        stmt.addBatch(idx_tables_name);
+        stmt.executeBatch();
+        LOG.info("Table '" + TB_TABLES + "' is created.");
       }
-      stmt.addBatch(idx_tables_name);
-      stmt.executeBatch();
-      LOG.info("Table '" + TB_TABLES + "' is created.");
 
       // COLUMNS
-      String columns_ddl = 
-          "CREATE TABLE " + TB_COLUMNS + " ("
-          + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES " + TB_TABLES + "("
-          + C_TABLE_ID + ") ON DELETE CASCADE, "
-          + "column_id INT NOT NULL,"
-          + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
-          + "CONSTRAINT C_COLUMN_ID UNIQUE (" + C_TABLE_ID + ", column_name))";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(columns_ddl);
-      }
-      stmt.addBatch(columns_ddl);
+      if (!baseTableMaps.get(TB_COLUMNS)) {
+        String columns_ddl =
+            "CREATE TABLE " + TB_COLUMNS + " ("
+                + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
+                + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES " + TB_TABLES + "("
+                + C_TABLE_ID + ") ON DELETE CASCADE, "
+                + "column_id INT NOT NULL,"
+                + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+                + "CONSTRAINT C_COLUMN_ID UNIQUE (" + C_TABLE_ID + ", column_name))";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(columns_ddl);
+        }
+        stmt.addBatch(columns_ddl);
 
-      String idx_fk_columns_table_name = 
-          "CREATE UNIQUE INDEX idx_fk_columns_table_name on "
-          + TB_COLUMNS + "(" + C_TABLE_ID + ", column_name)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_fk_columns_table_name);
+        String idx_fk_columns_table_name =
+            "CREATE UNIQUE INDEX idx_fk_columns_table_name on "
+                + TB_COLUMNS + "(" + C_TABLE_ID + ", column_name)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_fk_columns_table_name);
+        }
+        stmt.addBatch(idx_fk_columns_table_name);
+        stmt.executeBatch();
+        LOG.info("Table '" + TB_COLUMNS + " is created.");
       }
-      stmt.addBatch(idx_fk_columns_table_name);
-      stmt.executeBatch();
-      LOG.info("Table '" + TB_COLUMNS + " is created.");
 
       // OPTIONS
-      String options_ddl = 
-          "CREATE TABLE " + TB_OPTIONS +" ("
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID +") "
-          + "ON DELETE CASCADE, "
-          + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(options_ddl);
-      }
-      stmt.addBatch(options_ddl);
-      
-      String idx_options_key = 
-          "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_options_key);
-      }
-      stmt.addBatch(idx_options_key);
-      String idx_options_table_name = 
-          "CREATE INDEX idx_options_table_name on " + TB_OPTIONS 
-          + "(" + C_TABLE_ID + ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_options_table_name);
+      if (!baseTableMaps.get(TB_OPTIONS)) {
+        String options_ddl =
+            "CREATE TABLE " + TB_OPTIONS +" ("
+                + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID +") "
+                + "ON DELETE CASCADE, "
+                + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(options_ddl);
+        }
+        stmt.addBatch(options_ddl);
+
+        String idx_options_key =
+            "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_options_key);
+        }
+        stmt.addBatch(idx_options_key);
+        String idx_options_table_name =
+            "CREATE INDEX idx_options_table_name on " + TB_OPTIONS
+                + "(" + C_TABLE_ID + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_options_table_name);
+        }
+        stmt.addBatch(idx_options_table_name);
+        stmt.executeBatch();
+        LOG.info("Table '" + TB_OPTIONS + " is created.");
       }
-      stmt.addBatch(idx_options_table_name);
-      stmt.executeBatch();
-      LOG.info("Table '" + TB_OPTIONS + " is created.");
-      
+
       // INDEXES
-      String indexes_ddl = "CREATE TABLE " + TB_INDEXES +"("
-          + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
-          + "ON DELETE CASCADE, "
-          + "column_name VARCHAR(255) NOT NULL, "
-          + "data_type VARCHAR(255) NOT NULL, "
-          + "index_type CHAR(32) NOT NULL, "
-          + "is_unique BOOLEAN NOT NULL, "
-          + "is_clustered BOOLEAN NOT NULL, "
-          + "is_ascending BOOLEAN NOT NULL)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(indexes_ddl);
+      if (!baseTableMaps.get(TB_INDEXES)) {
+        String indexes_ddl = "CREATE TABLE " + TB_INDEXES +"("
+            + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+            + "ON DELETE CASCADE, "
+            + "column_name VARCHAR(255) NOT NULL, "
+            + "data_type VARCHAR(255) NOT NULL, "
+            + "index_type CHAR(32) NOT NULL, "
+            + "is_unique BOOLEAN NOT NULL, "
+            + "is_clustered BOOLEAN NOT NULL, "
+            + "is_ascending BOOLEAN NOT NULL)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(indexes_ddl);
+        }
+        stmt.addBatch(indexes_ddl);
+
+        String idx_indexes_key = "CREATE UNIQUE INDEX idx_indexes_key ON "
+            + TB_INDEXES + " (index_name)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_indexes_key);
+        }
+        stmt.addBatch(idx_indexes_key);
+
+        String idx_indexes_columns = "CREATE INDEX idx_indexes_columns ON "
+            + TB_INDEXES + " (" + C_TABLE_ID + ", column_name)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_indexes_columns);
+        }
+        stmt.addBatch(idx_indexes_columns);
+        stmt.executeBatch();
+        LOG.info("Table '" + TB_INDEXES + "' is created.");
       }
-      stmt.addBatch(indexes_ddl);
-      
-      String idx_indexes_key = "CREATE UNIQUE INDEX idx_indexes_key ON " 
-          + TB_INDEXES + " (index_name)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_indexes_key);
-      }      
-      stmt.addBatch(idx_indexes_key);
-      
-      String idx_indexes_columns = "CREATE INDEX idx_indexes_columns ON " 
-          + TB_INDEXES + " (" + C_TABLE_ID + ", column_name)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_indexes_columns);
-      } 
-      stmt.addBatch(idx_indexes_columns);
-      stmt.executeBatch();
-      LOG.info("Table '" + TB_INDEXES + "' is created.");
 
-      String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
-          + "ON DELETE CASCADE, "
-          + "num_rows BIGINT, "
-          + "num_bytes BIGINT)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(stats_ddl);
+      if (!baseTableMaps.get(TB_STATISTICS)) {
+        String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+            + "ON DELETE CASCADE, "
+            + "num_rows BIGINT, "
+            + "num_bytes BIGINT)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(stats_ddl);
+        }
+        stmt.addBatch(stats_ddl);
+
+        String idx_stats_fk_table_name = "CREATE INDEX idx_stats_table_name ON "
+            + TB_STATISTICS + " (" + C_TABLE_ID + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(idx_stats_fk_table_name);
+        }
+        stmt.addBatch(idx_stats_fk_table_name);
+        LOG.info("Table '" + TB_STATISTICS + "' is created.");
       }
-      stmt.addBatch(stats_ddl);
 
-      String idx_stats_fk_table_name = "CREATE INDEX idx_stats_table_name ON "
-          + TB_STATISTICS + " (" + C_TABLE_ID + ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(idx_stats_fk_table_name);
+      // PARTITION
+      if (!baseTableMaps.get(TB_PARTTIONS)) {
+        String partition_ddl = "CREATE TABLE " + TB_PARTTIONS + " ("
+            + "PID INT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+            + "name VARCHAR(255), "
+            + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
+            + "type VARCHAR(10) NOT NULL,"
+            + "quantity INT ,"
+            + "columns VARCHAR(255),"
+            + "expressions VARCHAR(1024)"
+            + ", CONSTRAINT PARTITION_PK PRIMARY KEY (PID)"
+            + "   )";
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(partition_ddl);
+        }
+        stmt.addBatch(partition_ddl);
+        LOG.info("Table '" + TB_PARTTIONS + "' is created.");
+        stmt.executeBatch();
       }
-      stmt.addBatch(idx_stats_fk_table_name);
-      stmt.executeBatch();
-      LOG.info("Table '" + TB_STATISTICS + "' is created.");
 
     } finally {
       wlock.unlock();
@@ -220,26 +258,34 @@ public class DerbyStore extends AbstractDBStore {
 
     wlock.lock();
     ResultSet res = null;
+    int foundCount = 0;
     try {
-      boolean found = false;
       res = getConnection().getMetaData().getTables(null, null, null,
           new String [] {"TABLE"});
-      
-      String resName;
-      while (res.next() && !found) {
-        resName = res.getString("TABLE_NAME");
-        if (TB_META.equals(resName)
-            || TB_TABLES.equals(resName)
-            || TB_COLUMNS.equals(resName)
-            || TB_OPTIONS.equals(resName)) {
-            return true;
-        }
+
+      baseTableMaps.put(TB_META, false);
+      baseTableMaps.put(TB_TABLES, false);
+      baseTableMaps.put(TB_COLUMNS, false);
+      baseTableMaps.put(TB_OPTIONS, false);
+      baseTableMaps.put(TB_STATISTICS, false);
+      baseTableMaps.put(TB_INDEXES, false);
+      baseTableMaps.put(TB_PARTTIONS, false);
+
+      while (res.next()) {
+        baseTableMaps.put(res.getString("TABLE_NAME"), true);
       }
     } finally {
       wlock.unlock();
       CatalogUtil.closeSQLWrapper(res);
-    }    
-    return false;
+    }
+
+    for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
+      if (!entry.getValue()) {
+        return false;
+      }
+    }
+
+    return true;
   }
   
   final boolean checkInternalTable(final String tableName) throws SQLException {
@@ -263,6 +309,7 @@ public class DerbyStore extends AbstractDBStore {
   
   @Override
   public final void addTable(final TableDesc table) throws IOException {
+    PreparedStatement pstmt = null;
     Statement stmt = null;
     ResultSet res = null;
 
@@ -324,10 +371,95 @@ public class DerbyStore extends AbstractDBStore {
         }
         stmt.executeUpdate(sql);
       }
+
+      //Partition
+      if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
+        try {
+          Partitions partitions = table.getPartitions();
+          List<Column> columnList = partitions.getColumns();
+
+          // Find columns which used for a partitioned table.
+          StringBuffer columns = new StringBuffer();
+          for(Column eachColumn : columnList) {
+            sql = "SELECT column_id from " + TB_COLUMNS + " WHERE TID "
+                + " = " + tid + " AND column_name = '" + eachColumn.getColumnName() + "'";
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(sql);
+            }
+            res = stmt.executeQuery(sql);
+            if (!res.next()) {
+              throw new IOException("ERROR: there is no columnId matched to "
+                  + table.getName());
+            }
+            columnId = res.getInt("column_id");
+
+            if(columns.length() > 0) {
+              columns.append(",");
+            }
+            columns.append(columnId);
+          }
+
+          // Set default partition name. But if user named to subpartition, it would be updated.
+//          String partitionName = partitions.getPartitionsType().name() + "_" + table.getName();
+
+          sql = "INSERT INTO " + TB_PARTTIONS + " (name, TID, "
+              + " type, quantity, columns, expressions) VALUES (?, ?, ?, ?, ?, ?) ";
+          pstmt = getConnection().prepareStatement(sql);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(sql);
+          }
+
+          // Find information for subpartitions
+          if (partitions.getSpecifiers() != null) {
+            int count = 1;
+            if (partitions.getSpecifiers().size() == 0) {
+              pstmt.clearParameters();
+              pstmt.setString(1, null);
+              pstmt.setInt(2, tid);
+              pstmt.setString(3, partitions.getPartitionsType().name());
+              pstmt.setInt(4, partitions.getNumPartitions());
+              pstmt.setString(5, columns.toString());
+              pstmt.setString(6, null);
+              pstmt.addBatch();
+            } else {
+              for(Specifier eachValue: partitions.getSpecifiers()) {
+                pstmt.clearParameters();
+                if (eachValue.getName() != null && !eachValue.getName().equals("")) {
+                  pstmt.setString(1, eachValue.getName());
+                } else {
+                  pstmt.setString(1, null);
+                }
+                pstmt.setInt(2, tid);
+                pstmt.setString(3, partitions.getPartitionsType().name());
+                pstmt.setInt(4, partitions.getNumPartitions());
+                pstmt.setString(5, columns.toString());
+                pstmt.setString(6, eachValue.getExpressions());
+                pstmt.addBatch();
+                count++;
+              }
+            }
+          } else {
+            pstmt.clearParameters();
+            pstmt.setString(1, null);
+            pstmt.setInt(2, tid);
+            pstmt.setString(3, partitions.getPartitionsType().name());
+            pstmt.setInt(4, partitions.getNumPartitions());
+            pstmt.setString(5, columns.toString());
+            pstmt.setString(6, null);
+            pstmt.addBatch();
+          }
+          pstmt.executeBatch();
+        } finally {
+          CatalogUtil.closeSQLWrapper(pstmt);
+        }
+      }
+
     } catch (SQLException se) {
       throw new IOException(se.getMessage(), se);
     } finally {
       wlock.unlock();
+      CatalogUtil.closeSQLWrapper(res, pstmt);
       CatalogUtil.closeSQLWrapper(res, stmt);
     }
   }
@@ -428,6 +560,17 @@ public class DerbyStore extends AbstractDBStore {
       }
 
       try {
+        sql = "DELETE FROM " + TB_PARTTIONS + " WHERE TID IN ("
+            + " SELECT TID FROM " + TB_TABLES
+            + " WHERE " + C_TABLE_ID + " = '" + name + "' )";
+        LOG.info(sql);
+        stmt = getConnection().createStatement();
+        stmt.execute(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      }
+
+      try {
         sql = "DELETE FROM " + TB_TABLES +
             " WHERE " + C_TABLE_ID +" = '" + name + "'";
         LOG.info(sql);
@@ -435,7 +578,6 @@ public class DerbyStore extends AbstractDBStore {
       } catch (SQLException se) {
         throw new IOException(se);
       }
-
     } catch (SQLException se) {
       throw new IOException(se);
     } finally {
@@ -454,6 +596,8 @@ public class DerbyStore extends AbstractDBStore {
     StoreType storeType = null;
     Options options;
     TableStats stat = null;
+    Partitions partitions = null;
+    int tid = 0;
 
     try {
       rlock.lock();
@@ -461,7 +605,7 @@ public class DerbyStore extends AbstractDBStore {
 
       try {
         String sql = 
-            "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+            "SELECT " + C_TABLE_ID + ", path, store_type, TID from " + TB_TABLES
             + " WHERE " + C_TABLE_ID + "='" + name + "'";
         if (LOG.isDebugEnabled()) {
           LOG.debug(sql);
@@ -474,6 +618,7 @@ public class DerbyStore extends AbstractDBStore {
         tableName = res.getString(C_TABLE_ID).trim();
         path = new Path(res.getString("path").trim());
         storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+        tid = res.getInt("TID");
       } catch (SQLException se) { 
         throw new IOException(se);
       } finally {
@@ -550,12 +695,48 @@ public class DerbyStore extends AbstractDBStore {
         CatalogUtil.closeSQLWrapper(res);
       }
 
+
+      try {
+        String sql = "SELECT name, type, quantity, columns, expressions from " + TB_PARTTIONS
+            + " WHERE TID =" + tid + "";
+        stmt = getConnection().createStatement();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        res = stmt.executeQuery(sql);
+
+        while (res.next()) {
+          if (partitions == null) {
+            partitions = new Partitions();
+            String[] columns = res.getString("columns").split(",");
+            for(String eachColumn: columns) {
+              partitions.addColumn(getColumn(tableName, tid, eachColumn));
+            }
+            partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString
+                ("type")));
+            partitions.setNumPartitions(res.getInt("quantity"));
+          }
+
+          Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
+          partitions.addSpecifier(specifier);
+        }
+
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        CatalogUtil.closeSQLWrapper(res, stmt);
+      }
+
       TableMeta meta = new TableMeta(storeType, options);
       TableDesc table = new TableDesc(tableName, schema, meta, path);
       if (stat != null) {
         table.setStats(stat);
       }
 
+      if (partitions != null) {
+        table.setPartitions(partitions);
+      }
+
       return table;
     } catch (SQLException se) {
       throw new IOException(se);
@@ -564,7 +745,42 @@ public class DerbyStore extends AbstractDBStore {
       CatalogUtil.closeSQLWrapper(stmt);
     }
   }
-  
+
+  private Column getColumn(String tableName, int tid, String columnId) throws IOException {
+    ResultSet res = null;
+    Column column = null;
+    Statement stmt = null;
+
+    try {
+      String sql = "SELECT column_name, data_type, type_length from "
+          + TB_COLUMNS + " WHERE TID = " + tid + " AND column_id = " + columnId;
+
+      stmt = getConnection().createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+
+      if (res.next()) {
+        String columnName = tableName + "."
+            + res.getString("column_name").trim();
+        Type dataType = getDataType(res.getString("data_type")
+            .trim());
+        int typeLength = res.getInt("type_length");
+        if (typeLength > 0) {
+          column = new Column(columnName, dataType, typeLength);
+        } else {
+          column = new Column(columnName, dataType);
+        }
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      CatalogUtil.closeSQLWrapper(res, stmt);
+    }
+    return column;
+  }
+
   private Type getDataType(final String typeStr) {
     try {
     return Enum.valueOf(Type.class, typeStr);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/0b0de13b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
index 259d9d6..c368969 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MySQLStore.java
@@ -29,6 +29,7 @@ import org.apache.tajo.exception.InternalException;
 import java.io.IOException;
 import java.sql.*;
 import java.util.List;
+import java.util.Map;
 
 public class MySQLStore extends AbstractDBStore  {
 
@@ -51,115 +52,158 @@ public class MySQLStore extends AbstractDBStore  {
   // TODO - DDL and index statements should be renamed
   protected void createBaseTable() throws SQLException {
 
-    // META
-    Statement stmt = getConnection().createStatement();
-    String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(meta_ddl);
-    }
+    int result;
+    Statement stmt = null;
     try {
-      int result = stmt.executeUpdate(meta_ddl);
-      LOG.info("Table '" + TB_META + " is created.");
+      stmt = getConnection().createStatement();
+
+      // META
+      if (!baseTableMaps.get(TB_META)) {
+        String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(meta_ddl);
+        }
+        result = stmt.executeUpdate(meta_ddl);
+        LOG.info("Table '" + TB_META + " is created.");
+      }
 
       // TABLES
-      String tables_ddl = "CREATE TABLE "
-          + TB_TABLES + " ("
-          + "TID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL UNIQUE, "
-          + "path TEXT, "
-          + "store_type CHAR(16)"
-          + ")";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(tables_ddl);
+      if (!baseTableMaps.get(TB_TABLES)) {
+        String tables_ddl = "CREATE TABLE "
+            + TB_TABLES + " ("
+            + "TID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL UNIQUE, "
+            + "path TEXT, "
+            + "store_type CHAR(16)"
+            + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(tables_ddl);
+        }
+
+        LOG.info("Table '" + TB_TABLES + "' is created.");
+        result = stmt.executeUpdate(tables_ddl);
       }
 
-      LOG.info("Table '" + TB_TABLES + "' is created.");
-      result = stmt.executeUpdate(tables_ddl);
       // COLUMNS
+      if (!baseTableMaps.get(TB_COLUMNS)) {
+        String columns_ddl =
+            "CREATE TABLE " + TB_COLUMNS + " ("
+                + "TID INT NOT NULL,"
+                + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+                + "column_id INT NOT NULL,"
+                + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
+                + "UNIQUE KEY(" + C_TABLE_ID + ", column_name),"
+                + "FOREIGN KEY(TID) REFERENCES "+TB_TABLES+"(TID) ON DELETE CASCADE,"
+                + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(columns_ddl);
+        }
 
-      String columns_ddl =
-          "CREATE TABLE " + TB_COLUMNS + " ("
-              + "TID INT NOT NULL,"
-              + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
-              + "column_id INT NOT NULL,"
-              + "column_name VARCHAR(255) NOT NULL, " + "data_type CHAR(16), " + "type_length INTEGER, "
-              + "UNIQUE KEY(" + C_TABLE_ID + ", column_name),"
-              + "FOREIGN KEY(TID) REFERENCES "+TB_TABLES+"(TID) ON DELETE CASCADE,"
-              + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(columns_ddl);
+        LOG.info("Table '" + TB_COLUMNS + " is created.");
+        result = stmt.executeUpdate(columns_ddl);
       }
 
-      LOG.info("Table '" + TB_COLUMNS + " is created.");
-      result = stmt.executeUpdate(columns_ddl);
       // OPTIONS
-
-      String options_ddl =
-          "CREATE TABLE " + TB_OPTIONS + " ("
-              + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
-              + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL,"
-              + "INDEX("+C_TABLE_ID+", key_),"
-              + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(options_ddl);
+      if (!baseTableMaps.get(TB_OPTIONS)) {
+        String options_ddl =
+            "CREATE TABLE " + TB_OPTIONS + " ("
+                + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+                + "key_ VARCHAR(255) NOT NULL, value_ VARCHAR(255) NOT NULL,"
+                + "INDEX("+C_TABLE_ID+", key_),"
+                + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(options_ddl);
+        }
+        LOG.info("Table '" + TB_OPTIONS + " is created.");
+        result = stmt.executeUpdate(options_ddl);
       }
-      LOG.info("Table '" + TB_OPTIONS + " is created.");
-      result = stmt.executeUpdate(options_ddl);
+
       // INDEXES
+      if (!baseTableMaps.get(TB_INDEXES)) {
+        String indexes_ddl = "CREATE TABLE " + TB_INDEXES + "("
+            + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+            + "column_name VARCHAR(255) NOT NULL, "
+            + "data_type VARCHAR(255) NOT NULL, "
+            + "index_type CHAR(32) NOT NULL, "
+            + "is_unique BOOLEAN NOT NULL, "
+            + "is_clustered BOOLEAN NOT NULL, "
+            + "is_ascending BOOLEAN NOT NULL,"
+            + "INDEX(" + C_TABLE_ID + ", column_name),"
+            + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(indexes_ddl);
+        }
+        LOG.info("Table '" + TB_INDEXES + "' is created.");
+        result = stmt.executeUpdate(indexes_ddl);
+      }
 
-      String indexes_ddl = "CREATE TABLE " + TB_INDEXES + "("
-          + "index_name VARCHAR(255) NOT NULL PRIMARY KEY, "
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
-          + "column_name VARCHAR(255) NOT NULL, "
-          + "data_type VARCHAR(255) NOT NULL, "
-          + "index_type CHAR(32) NOT NULL, "
-          + "is_unique BOOLEAN NOT NULL, "
-          + "is_clustered BOOLEAN NOT NULL, "
-          + "is_ascending BOOLEAN NOT NULL,"
-          + "INDEX(" + C_TABLE_ID + ", column_name),"
-          + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(indexes_ddl);
+      if (!baseTableMaps.get(TB_STATISTICS)) {
+        String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+            + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
+            + "num_rows BIGINT, "
+            + "num_bytes BIGINT,"
+            + "INDEX("+C_TABLE_ID+"),"
+            + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(stats_ddl);
+        }
+        LOG.info("Table '" + TB_STATISTICS + "' is created.");
+        result = stmt.executeUpdate(stats_ddl);
       }
-      LOG.info("Table '" + TB_INDEXES + "' is created.");
-      result = stmt.executeUpdate(indexes_ddl);
-
-      String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
-          + C_TABLE_ID + " VARCHAR(255) NOT NULL,"
-          + "num_rows BIGINT, "
-          + "num_bytes BIGINT,"
-          + "INDEX("+C_TABLE_ID+"),"
-          + "FOREIGN KEY("+C_TABLE_ID+") REFERENCES "+TB_TABLES+"("+C_TABLE_ID+") ON DELETE CASCADE)";
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(stats_ddl);
+
+      // PARTITION
+      if (!baseTableMaps.get(TB_PARTTIONS)) {
+        String partition_ddl = "CREATE TABLE " + TB_PARTTIONS + " ("
+            + "PID int NOT NULL AUTO_INCREMENT PRIMARY KEY, "
+            + "name VARCHAR(255), "
+            + "TID INT NOT NULL,"
+            + "type VARCHAR(10) NOT NULL,"
+            + "quantity INT ,"
+            + "columns VARCHAR(255),"
+            + "expressions TEXT )";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(partition_ddl);
+        }
+        LOG.info("Table '" + TB_PARTTIONS + "' is created.");
+        result = stmt.executeUpdate(partition_ddl);
       }
-      LOG.info("Table '" + TB_STATISTICS + "' is created.");
-      result = stmt.executeUpdate(stats_ddl);
     } finally {
       CatalogUtil.closeSQLWrapper(stmt);
     }
   }
 
   protected boolean isInitialized() throws SQLException {
-    boolean found = false;
     ResultSet res = getConnection().getMetaData().getTables(null, null, null,
         new String[]{"TABLE"});
 
-    String resName;
     try {
-      while (res.next() && !found) {
-        resName = res.getString("TABLE_NAME");
-        if (TB_META.equals(resName)
-            || TB_TABLES.equals(resName)
-            || TB_COLUMNS.equals(resName)
-            || TB_OPTIONS.equals(resName)) {
-          return true;
-        }
+      baseTableMaps.put(TB_META, false);
+      baseTableMaps.put(TB_TABLES, false);
+      baseTableMaps.put(TB_COLUMNS, false);
+      baseTableMaps.put(TB_OPTIONS, false);
+      baseTableMaps.put(TB_STATISTICS, false);
+      baseTableMaps.put(TB_INDEXES, false);
+      baseTableMaps.put(TB_PARTTIONS, false);
+
+      if (res.wasNull())
+        return false;
+
+      while (res.next()) {
+        baseTableMaps.put(res.getString("TABLE_NAME"), true);
       }
     } finally {
       CatalogUtil.closeSQLWrapper(res);
     }
-    return false;
+
+    for(Map.Entry<String, Boolean> entry : baseTableMaps.entrySet()) {
+      if (!entry.getValue()) {
+        return false;
+      }
+    }
+
+    return  true;
+//    return false;
   }
 
   @Override