You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/27 09:29:37 UTC

[2/4] TAJO-475: Table partition catalog recap. (Min Zhou and hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index 9b10f90..4dfc97a 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -21,8 +21,7 @@ package org.apache.tajo.catalog;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.exception.NoSuchFunctionException;
 import org.apache.tajo.catalog.function.Function;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
@@ -246,13 +245,16 @@ public class TestCatalog {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionDesc partitionDesc = new PartitionDesc();
-    partitionDesc.addColumn(new Column("id", Type.INT4));
-    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
-    partitionDesc.setNumPartitions(2);
+    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+    partitionDesc.setTableId(tableName);
+    partitionDesc.setExpression("id");
+    Schema partSchema = new Schema();
+    partSchema.addColumn("id", Type.INT4);
+    partitionDesc.setExpressionSchema(partSchema);
+    partitionDesc.setPartitionType(CatalogProtos.PartitionType.HASH);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitionDesc);
+    desc.setPartitionMethod(partitionDesc);
 
     assertFalse(catalog.existsTable(tableName));
     catalog.addTable(desc);
@@ -260,9 +262,8 @@ public class TestCatalog {
     TableDesc retrieved = catalog.getTableDesc(tableName);
 
     assertEquals(retrieved.getName(), tableName);
-    assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
-    assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
-    assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
+    assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.HASH);
+    assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getColumnName(), "id");
 
     catalog.deleteTable(tableName);
     assertFalse(catalog.existsTable(tableName));
@@ -282,17 +283,17 @@ public class TestCatalog {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionDesc partitionDesc = new PartitionDesc();
-    partitionDesc.addColumn(new Column("id", Type.INT4));
-    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
-    partitionDesc.setNumPartitions(2);
-
-    partitionDesc.addSpecifier(new Specifier("sub_part1"));
-    partitionDesc.addSpecifier(new Specifier("sub_part2"));
-    partitionDesc.addSpecifier(new Specifier("sub_part3"));
+    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+    partitionDesc.setTableId(tableName);
+    partitionDesc.setExpression("id");
+    Schema partSchema = new Schema();
+    partSchema.addColumn("id", Type.INT4);
+    partitionDesc.setExpressionSchema(partSchema);
+    partitionDesc.setPartitionType(CatalogProtos.PartitionType.HASH);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitionDesc);
+    desc.setPartitionMethod(partitionDesc);
+
     assertFalse(catalog.existsTable(tableName));
     catalog.addTable(desc);
     assertTrue(catalog.existsTable(tableName));
@@ -300,15 +301,8 @@ public class TestCatalog {
     TableDesc retrieved = catalog.getTableDesc(tableName);
 
     assertEquals(retrieved.getName(), tableName);
-    assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
-    assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
-    assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
-        "sub_part1");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
-        "sub_part2");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getName(),
-        "sub_part3");
+    assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.HASH);
+    assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getColumnName(), "id");
 
     catalog.deleteTable(tableName);
     assertFalse(catalog.existsTable(tableName));
@@ -327,15 +321,16 @@ public class TestCatalog {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionDesc partitionDesc = new PartitionDesc();
-    partitionDesc.addColumn(new Column("id", Type.INT4));
-    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.LIST);
-
-    partitionDesc.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
-    partitionDesc.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+    partitionDesc.setTableId(tableName);
+    partitionDesc.setExpression("id");
+    Schema partSchema = new Schema();
+    partSchema.addColumn("id", Type.INT4);
+    partitionDesc.setExpressionSchema(partSchema);
+    partitionDesc.setPartitionType(CatalogProtos.PartitionType.LIST);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitionDesc);
+    desc.setPartitionMethod(partitionDesc);
     assertFalse(catalog.existsTable(tableName));
     catalog.addTable(desc);
     assertTrue(catalog.existsTable(tableName));
@@ -343,16 +338,8 @@ public class TestCatalog {
     TableDesc retrieved = catalog.getTableDesc(tableName);
 
     assertEquals(retrieved.getName(), tableName);
-    assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.LIST);
-    assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
-        "sub_part1");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
-        "Seoul,서울");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
-        "sub_part2");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getExpressions(),
-        "Busan,부산");
+    assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.LIST);
+    assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getColumnName(), "id");
 
     catalog.deleteTable(tableName);
     assertFalse(catalog.existsTable(tableName));
@@ -371,16 +358,17 @@ public class TestCatalog {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionDesc partitionDesc = new PartitionDesc();
-    partitionDesc.addColumn(new Column("id", Type.INT4));
-    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
 
-    partitionDesc.addSpecifier(new Specifier("sub_part1", "2"));
-    partitionDesc.addSpecifier(new Specifier("sub_part2", "5"));
-    partitionDesc.addSpecifier(new Specifier("sub_part3"));
+    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+    partitionDesc.setTableId(tableName);
+    partitionDesc.setExpression("id");
+    Schema partSchema = new Schema();
+    partSchema.addColumn("id", Type.INT4);
+    partitionDesc.setExpressionSchema(partSchema);
+    partitionDesc.setPartitionType(CatalogProtos.PartitionType.RANGE);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitionDesc);
+    desc.setPartitionMethod(partitionDesc);
     assertFalse(catalog.existsTable(tableName));
     catalog.addTable(desc);
     assertTrue(catalog.existsTable(tableName));
@@ -388,20 +376,8 @@ public class TestCatalog {
     TableDesc retrieved = catalog.getTableDesc(tableName);
 
     assertEquals(retrieved.getName(), tableName);
-    assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.RANGE);
-    assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
-        "sub_part1");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
-        "2");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getName(),
-        "sub_part2");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(1).getExpressions(),
-        "5");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getName(),
-        "sub_part3");
-    assertEquals(retrieved.getPartitions().getSpecifiers().get(2).getExpressions(),
-        "");
+    assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.RANGE);
+    assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getColumnName(), "id");
 
     catalog.deleteTable(tableName);
     assertFalse(catalog.existsTable(tableName));
@@ -420,12 +396,16 @@ public class TestCatalog {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionDesc partitionDesc = new PartitionDesc();
-    partitionDesc.addColumn(new Column("id", Type.INT4));
-    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+    partitionDesc.setTableId(tableName);
+    partitionDesc.setExpression("id");
+    Schema partSchema = new Schema();
+    partSchema.addColumn("id", Type.INT4);
+    partitionDesc.setExpressionSchema(partSchema);
+    partitionDesc.setPartitionType(CatalogProtos.PartitionType.COLUMN);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitionDesc);
+    desc.setPartitionMethod(partitionDesc);
     assertFalse(catalog.existsTable(tableName));
     catalog.addTable(desc);
     assertTrue(catalog.existsTable(tableName));
@@ -433,8 +413,8 @@ public class TestCatalog {
     TableDesc retrieved = catalog.getTableDesc(tableName);
 
     assertEquals(retrieved.getName(), tableName);
-    assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.COLUMN);
-    assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
+    assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.COLUMN);
+    assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getColumnName(), "id");
 
     catalog.deleteTable(tableName);
     assertFalse(catalog.existsTable(tableName));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
index 1949afc..502daf0 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
@@ -22,8 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -75,10 +74,10 @@ public class TestDBStore {
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
     assertFalse(store.existTable(tableName));
-    store.addTable(desc);
+    store.addTable(desc.getProto());
     assertTrue(store.existTable(tableName));
 
-    TableDesc retrieved = store.getTable(tableName);
+    TableDesc retrieved = new TableDesc(store.getTable(tableName));
     // Schema order check
     assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
     store.deleteTable(tableName);
@@ -105,8 +104,8 @@ public class TestDBStore {
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "gettable"));
     desc.setStats(stat);
 
-    store.addTable(desc);
-    TableDesc retrieved = store.getTable(tableName);
+    store.addTable(desc.getProto());
+    TableDesc retrieved = new TableDesc(store.getTable(tableName));
     assertEquals(",", retrieved.getMeta().getOption("file.delimiter"));
     assertEquals(desc, retrieved);
     assertTrue(957685 == desc.getStats().getNumRows());
@@ -130,7 +129,7 @@ public class TestDBStore {
       TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
       TableDesc desc = new TableDesc(tableName, schema, meta,
           new Path(CommonTestingUtil.getTestDir(), "tableA_" + i));
-      store.addTable(desc);
+      store.addTable(desc.getProto());
     }
     
     assertEquals(numTables, store.getAllTableNames().size());
@@ -139,7 +138,7 @@ public class TestDBStore {
   @Test
   public final void testAddAndDeleteIndex() throws Exception {
     TableDesc table = prepareTable();
-    store.addTable(table);
+    store.addTable(table.getProto());
     
     store.addIndex(TestCatalog.desc1.getProto());
     assertTrue(store.existIndex(TestCatalog.desc1.getName()));
@@ -153,7 +152,7 @@ public class TestDBStore {
   public final void testGetIndex() throws Exception {
     
     TableDesc table = prepareTable();
-    store.addTable(table);
+    store.addTable(table.getProto());
     
     store.addIndex(TestCatalog.desc2.getProto());
     assertEquals(
@@ -168,7 +167,7 @@ public class TestDBStore {
   public final void testGetIndexByTableAndColumn() throws Exception {
     
     TableDesc table = prepareTable();
-    store.addTable(table);
+    store.addTable(table.getProto());
     
     store.addIndex(TestCatalog.desc2.getProto());
     
@@ -186,7 +185,7 @@ public class TestDBStore {
   public final void testGetAllIndexes() throws Exception {
     
     TableDesc table = prepareTable();
-    store.addTable(table);
+    store.addTable(table.getProto());
     
     store.addIndex(TestCatalog.desc1.getProto());
     store.addIndex(TestCatalog.desc2.getProto());
@@ -236,18 +235,21 @@ public class TestDBStore {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionDesc partitionDesc = new PartitionDesc();
-    partitionDesc.addColumn(new Column("id", Type.INT4));
-    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
-    partitionDesc.setNumPartitions(2);
+    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+    partitionDesc.setTableId(tableName);
+    partitionDesc.setExpression("id");
+    Schema partSchema = new Schema();
+    partSchema.addColumn("id", Type.INT4);
+    partitionDesc.setExpressionSchema(partSchema);
+    partitionDesc.setPartitionType(CatalogProtos.PartitionType.HASH);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitionDesc);
+    desc.setPartitionMethod(partitionDesc);
     assertFalse(store.existTable(tableName));
-    store.addTable(desc);
+    store.addTable(desc.getProto());
     assertTrue(store.existTable(tableName));
 
-    TableDesc retrieved = store.getTable(tableName);
+    TableDesc retrieved = new TableDesc(store.getTable(tableName));
 
     // Schema order check
     assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
@@ -268,22 +270,22 @@ public class TestDBStore {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionDesc partitionDesc = new PartitionDesc();
-    partitionDesc.addColumn(new Column("id", Type.INT4));
-    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
-    partitionDesc.setNumPartitions(2);
 
-    partitionDesc.addSpecifier(new Specifier("sub_part1"));
-    partitionDesc.addSpecifier(new Specifier("sub_part2"));
-    partitionDesc.addSpecifier(new Specifier("sub_part3"));
+    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+    partitionDesc.setTableId(tableName);
+    partitionDesc.setExpression("id");
+    Schema partSchema = new Schema();
+    partSchema.addColumn("id", Type.INT4);
+    partitionDesc.setExpressionSchema(partSchema);
+    partitionDesc.setPartitionType(CatalogProtos.PartitionType.HASH);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitionDesc);
+    desc.setPartitionMethod(partitionDesc);
     assertFalse(store.existTable(tableName));
-    store.addTable(desc);
+    store.addTable(desc.getProto());
     assertTrue(store.existTable(tableName));
 
-    TableDesc retrieved = store.getTable(tableName);
+    TableDesc retrieved = new TableDesc(store.getTable(tableName));
 
     // Schema order check
     assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
@@ -304,20 +306,21 @@ public class TestDBStore {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionDesc partitionDesc = new PartitionDesc();
-    partitionDesc.addColumn(new Column("id", Type.INT4));
-    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.LIST);
-
-    partitionDesc.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
-    partitionDesc.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+    partitionDesc.setTableId(tableName);
+    partitionDesc.setExpression("id");
+    Schema partSchema = new Schema();
+    partSchema.addColumn("id", Type.INT4);
+    partitionDesc.setExpressionSchema(partSchema);
+    partitionDesc.setPartitionType(CatalogProtos.PartitionType.LIST);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitionDesc);
+    desc.setPartitionMethod(partitionDesc);
     assertFalse(store.existTable(tableName));
-    store.addTable(desc);
+    store.addTable(desc.getProto());
     assertTrue(store.existTable(tableName));
 
-    TableDesc retrieved = store.getTable(tableName);
+    TableDesc retrieved = new TableDesc(store.getTable(tableName));
 
     // Schema order check
     assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
@@ -338,21 +341,21 @@ public class TestDBStore {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionDesc partitionDesc = new PartitionDesc();
-    partitionDesc.addColumn(new Column("id", Type.INT4));
-    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
-
-    partitionDesc.addSpecifier(new Specifier("sub_part1", "2"));
-    partitionDesc.addSpecifier(new Specifier("sub_part2", "5"));
-    partitionDesc.addSpecifier(new Specifier("sub_part3"));
+    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+    partitionDesc.setTableId(tableName);
+    partitionDesc.setExpression("id");
+    Schema partSchema = new Schema();
+    partSchema.addColumn("id", Type.INT4);
+    partitionDesc.setExpressionSchema(partSchema);
+    partitionDesc.setPartitionType(CatalogProtos.PartitionType.RANGE);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitionDesc);
+    desc.setPartitionMethod(partitionDesc);
     assertFalse(store.existTable(tableName));
-    store.addTable(desc);
+    store.addTable(desc.getProto());
     assertTrue(store.existTable(tableName));
 
-    TableDesc retrieved = store.getTable(tableName);
+    TableDesc retrieved = new TableDesc(store.getTable(tableName));
 
     // Schema order check
     assertSchemaOrder(desc.getSchema(), retrieved.getSchema());
@@ -373,17 +376,21 @@ public class TestDBStore {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    PartitionDesc partitionDesc = new PartitionDesc();
-    partitionDesc.addColumn(new Column("id", Type.INT4));
-    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+    PartitionMethodDesc partitionDesc = new PartitionMethodDesc();
+    partitionDesc.setTableId(tableName);
+    partitionDesc.setExpression("id");
+    Schema partSchema = new Schema();
+    partSchema.addColumn("id", Type.INT4);
+    partitionDesc.setExpressionSchema(partSchema);
+    partitionDesc.setPartitionType(CatalogProtos.PartitionType.COLUMN);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitionDesc);
+    desc.setPartitionMethod(partitionDesc);
     assertFalse(store.existTable(tableName));
-    store.addTable(desc);
+    store.addTable(desc.getProto());
     assertTrue(store.existTable(tableName));
 
-    TableDesc retrieved = store.getTable(tableName);
+    TableDesc retrieved = new TableDesc(store.getTable(tableName));
 
     // Schema order check
     assertSchemaOrder(desc.getSchema(), retrieved.getSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java b/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
index b4678c0..8bf00e1 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/DescTableCommand.java
@@ -20,9 +20,10 @@ package org.apache.tajo.cli;
 
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.TUtil;
 
 import java.io.PrintWriter;
 import java.util.List;
@@ -99,36 +100,14 @@ public class DescTableCommand extends TajoShellCommand {
     }
 
     sb.append("\n");
-    sb.append("Partitions: \n");
-    if (desc.getPartitions() != null) {
-      sb.append("type:").append(desc.getPartitions().getPartitionsType().name()).append("\n");
-      if (desc.getPartitions().getNumPartitions() > 0)
-        sb.append("numbers:").append(desc.getPartitions().getNumPartitions()).append("\n");
+    if (desc.getPartitionMethod() != null) {
+      PartitionMethodDesc partition = desc.getPartitionMethod();
+      sb.append("Partitions: \n");
 
-      sb.append("columns:").append("\n");
-      for(Column eachColumn: desc.getPartitions().getColumns()) {
-        sb.append("  ");
-        sb.append(eachColumn.getColumnName()).append("\t").append(eachColumn.getDataType().getType());
-        if (eachColumn.getDataType().hasLength()) {
-          sb.append("(").append(eachColumn.getDataType().getLength()).append(")");
-        }
-        sb.append("\n");
-      }
+      sb.append("type:").append(partition.getPartitionType().name()).append("\n");
 
-      if (desc.getPartitions().getSpecifiers() != null) {
-        sb.append("specifier:").append("\n");
-        for(Specifier specifier :desc.getPartitions().getSpecifiers()) {
-          sb.append("  ");
-          sb.append("name:").append(specifier.getName());
-          if (!specifier.getExpressions().equals("")) {
-            sb.append(", expressions:").append(specifier.getExpressions());
-          } else {
-            if (desc.getPartitions().getPartitionsType().name().equals("RANGE"));
-            sb.append(" expressions: MAXVALUE");
-          }
-          sb.append("\n");
-        }
-      }
+      sb.append("columns:").append(":");
+      sb.append(TUtil.arrayToString(partition.getExpressionSchema().toArray()));
     }
 
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index d75b5af..c0c9462 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -162,7 +162,7 @@ message CreateTableRequest {
   required SchemaProto schema = 2;
   required TableProto meta = 3;
   required string path = 4;
-  optional PartitionDescProto partitions = 5;
+  optional PartitionMethodProto partition = 5;
 }
 
 message DropTableRequest {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/pom.xml b/tajo-core/tajo-core-backend/pom.xml
index abc217e..125a6d6 100644
--- a/tajo-core/tajo-core-backend/pom.xml
+++ b/tajo-core/tajo-core-backend/pom.xml
@@ -266,7 +266,7 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
-      <scope>test</scope>
+      <type>test-jar</type>
       <exclusions>
         <exclusion>
           <groupId>commons-el</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 5507ba0..daa8f8d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -28,7 +28,6 @@ import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.algebra.Aggregation.GroupType;
 import org.apache.tajo.algebra.LiteralValue.LiteralType;
-import org.apache.tajo.datum.TextDatum;
 import org.apache.tajo.engine.parser.SQLParser.*;
 import org.apache.tajo.storage.CSVFile;
 
@@ -966,9 +965,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     }
 
     if (checkIfExist(ctx.table_partitioning_clauses())) {
-      PartitionDescExpr partitionDesc =
+      PartitionMethodDescExpr partitionMethodDesc =
           parseTablePartitioningClause(ctx.table_partitioning_clauses());
-      createTable.setPartition(partitionDesc);
+      createTable.setPartitionMethod(partitionMethodDesc);
     }
     return createTable;
   }
@@ -985,7 +984,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     return elements;
   }
 
-  public PartitionDescExpr parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
+  public PartitionMethodDescExpr parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
 
     if (checkIfExist(ctx.range_partitions())) { // For Range Partition
       Range_partitionsContext rangePartitionsContext = ctx.range_partitions();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
index 3dd371b..d9645a4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/InsertNode.java
@@ -44,13 +44,17 @@ public class InsertNode extends StoreTableNode implements Cloneable {
 
   public void setTargetTable(TableDesc desc) {
     setTableName(desc.getName());
-    tableSchema = desc.getSchema();
+    if (desc.hasPartition()) {
+      tableSchema = desc.getLogicalSchema();
+    } else {
+      tableSchema = desc.getSchema();
+    }
     setPath(desc.getPath());
     setOptions(desc.getMeta().getOptions());
     setStorageType(desc.getMeta().getStoreType());
 
-    if (desc.hasPartitions()) {
-      this.setPartitions(desc.getPartitions());
+    if (desc.hasPartition()) {
+      this.setPartitionMethod(desc.getPartitionMethod());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
index 902cbee..0831b0a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlan.java
@@ -277,7 +277,7 @@ public class LogicalPlan {
       // Trying to find columns from other relations in other blocks
       for (QueryBlock eachBlock : queryBlocks.values()) {
         for (RelationNode rel : eachBlock.getRelations()) {
-          Column found = rel.getOutSchema().getColumnByName(columnRef.getName());
+          Column found = rel.getTableSchema().getColumnByName(columnRef.getName());
           if (found != null) {
             candidates.add(found);
           }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 066a3bb..a5d9787 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -18,7 +18,9 @@
 
 package org.apache.tajo.engine.planner;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -29,8 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.algebra.CreateTable.ColumnDefinition;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.Specifier;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.NullDatum;
@@ -44,7 +45,6 @@ import org.apache.tajo.util.TUtil;
 
 import java.util.*;
 
-import static org.apache.tajo.algebra.CreateTable.ColumnPartition;
 import static org.apache.tajo.algebra.CreateTable.PartitionType;
 import static org.apache.tajo.engine.planner.ExprNormalizer.ExprNormalizedResult;
 import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
@@ -372,6 +372,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
           }
         }
       }
+    } else if (projectable instanceof RelationNode) {
+      RelationNode relationNode = (RelationNode) projectable;
+      for (Target target : projectable.getTargets()) {
+        Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
+        for (Column c : columns) {
+          if (!relationNode.getTableSchema().contains(c)) {
+            throw new PlanningException("Cannot get such a field: " + c);
+          }
+        }
+      }
     } else {
       for (Target target : projectable.getTargets()) {
         Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
@@ -924,7 +934,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     // Assume that each unique expr is evaluated once.
     List<Target> targets = new ArrayList<Target>();
-    for (Column column : scanNode.getInSchema().getColumns()) {
+    for (Column column : scanNode.getTableSchema().getColumns()) {
       ColumnReferenceExpr columnRef = new ColumnReferenceExpr(column.getQualifier(), column.getColumnName());
       if (block.namedExprsMgr.contains(columnRef)) {
         String referenceName = block.namedExprsMgr.getName(columnRef);
@@ -1179,7 +1189,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     } else { // when a user do not specified target columns
 
       // The output schema of select clause determines the target columns.
-      Schema tableSchema = desc.getSchema();
+      Schema tableSchema = desc.getLogicalSchema();
       Schema projectedSchema = insertNode.getChild().getOutSchema();
 
       Schema targetColumns = new Schema();
@@ -1190,8 +1200,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       buildProjectedInsert(insertNode);
     }
 
-    if (desc.hasPartitions()) {
-      insertNode.setPartitions(desc.getPartitions());
+    if (desc.hasPartition()) {
+      insertNode.setPartitionMethod(desc.getPartitionMethod());
     }
     return insertNode;
   }
@@ -1254,223 +1264,122 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   public LogicalNode visitCreateTable(PlanContext context, Stack<Expr> stack, CreateTable expr)
       throws PlanningException {
 
-    // Get a table name to be created.
-    String tableNameTobeCreated = expr.getTableName();
+    CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
+
+    // Set a table name to be created.
+    createTableNode.setTableName(expr.getTableName());
+
+    if (expr.hasStorageType()) { // If storage type (using clause) is specified
+      createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
+    } else { // otherwise, default type
+      createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
+    }
+
+    if (expr.hasParams()) {
+      Options options = new Options();
+      options.putAll(expr.getParams());
+      createTableNode.setOptions(options);
+    }
+
+    if (expr.hasPartition()) {
+      if (expr.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) {
+        createTableNode.setPartitionMethod(getPartitionMethod(context, expr.getTableName(), expr.getPartitionMethod()));
+      } else {
+        throw new PlanningException(String.format("Not supported PartitonType: %s",
+            expr.getPartitionMethod().getPartitionType()));
+      }
+    }
 
     if (expr.hasSubQuery()) { // CREATE TABLE .. AS SELECT
       stack.add(expr);
       LogicalNode subQuery = visit(context, stack, expr.getSubQuery());
       stack.pop();
-      CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
-      createTableNode.setTableName(tableNameTobeCreated);
       createTableNode.setChild(subQuery);
       createTableNode.setInSchema(subQuery.getOutSchema());
 
-      // if no table definition, the select clause's output schema will be used.
-      // ex) CREATE TABLE tbl AS SELECT ...
-      if(!expr.hasTableElements()) {
-
-        expr.setTableElements(convertSchemaToTableElements(subQuery.getOutSchema()));
-      }
-
-      // Otherwise, it uses the defined table elements.
+      // If the table schema is defined
       // ex) CREATE TABLE tbl(col1 type, col2 type) AS SELECT ...
-      createTableNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
-      createTableNode.setSchema(convertTableElementsSchema(expr.getTableElements()));
-
-      if (expr.hasStorageType()) { // If storage type (using clause) is specified
-        createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
-      } else { // If no specified storage type
-        // default type
-        createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
-      }
-
-      if (expr.hasParams()) { // if 'with clause' is specified
-        Options options = new Options();
-        options.putAll(expr.getParams());
-        createTableNode.setOptions(options);
-      }
-
-      if (expr.hasPartition()) { // if 'partition by' is specified
-        createTableNode.setPartitions(convertTableElementsPartition(context, expr));
-      }
+      if (expr.hasTableElements()) {
+        createTableNode.setOutSchema(convertTableElementsSchema(expr.getTableElements()));
+        createTableNode.setTableSchema(convertTableElementsSchema(expr.getTableElements()));
+      } else {
+        // if no table definition, the select clause's output schema will be used.
+        // ex) CREATE TABLE tbl AS SELECT ...
 
-      return createTableNode;
+        if (expr.hasPartition()) {
+          PartitionMethodDesc partitionMethod = createTableNode.getPartitionMethod();
 
-    } else { // if CREATE AN EMPTY TABLE
-      Schema tableSchema;
-      boolean mergedPartition = false;
-      if (expr.hasPartition()) {
-        if (expr.getPartition().getPartitionType().equals(PartitionType.COLUMN)) {
-          if (((ColumnPartition)expr.getPartition()).isOmitValues()) {
-            mergedPartition = true;
+          Schema queryOutputSchema = subQuery.getOutSchema();
+          Schema partitionExpressionSchema = partitionMethod.getExpressionSchema();
+          if (partitionMethod.getPartitionType() == CatalogProtos.PartitionType.COLUMN &&
+              queryOutputSchema.getColumnNum() < partitionExpressionSchema.getColumnNum()) {
+            throw new VerifyException("Partition columns cannot be more than table columns.");
           }
+          Schema tableSchema = new Schema();
+          for (int i = 0; i < queryOutputSchema.getColumnNum() - partitionExpressionSchema.getColumnNum(); i++) {
+            tableSchema.addColumn(queryOutputSchema.getColumn(i));
+          }
+          createTableNode.setOutSchema(tableSchema);
+          createTableNode.setTableSchema(tableSchema);
         } else {
-          throw new PlanningException(String.format("Not supported PartitonType: %s",
-              expr.getPartition().getPartitionType()));
+          createTableNode.setOutSchema(subQuery.getOutSchema());
+          createTableNode.setTableSchema(subQuery.getOutSchema());
         }
       }
 
-      if (mergedPartition) {
-        ColumnDefinition [] merged = TUtil.concat(expr.getTableElements(),
-            ((ColumnPartition)expr.getPartition()).getColumns());
-        tableSchema = convertTableElementsSchema(merged);
-      } else {
-        tableSchema = convertTableElementsSchema(expr.getTableElements());
-      }
+      return createTableNode;
 
-      CreateTableNode createTableNode = context.queryBlock.getNodeFromExpr(expr);
-      createTableNode.setTableName(expr.getTableName());
-      createTableNode.setSchema(tableSchema);
+    } else { // if CREATE AN EMPTY TABLE
+      Schema tableSchema = convertColumnsToSchema(expr.getTableElements());
+      createTableNode.setTableSchema(tableSchema);
 
       if (expr.isExternal()) {
         createTableNode.setExternal(true);
       }
 
-      if (expr.hasStorageType()) {
-        createTableNode.setStorageType(CatalogUtil.getStoreType(expr.getStorageType()));
-      } else {
-        // default type
-        // TODO - it should be configurable.
-        createTableNode.setStorageType(CatalogProtos.StoreType.CSV);
-      }
-
-      if (expr.hasParams()) {
-        Options options = new Options();
-        options.putAll(expr.getParams());
-        createTableNode.setOptions(options);
-      }
-
       if (expr.hasLocation()) {
         createTableNode.setPath(new Path(expr.getLocation()));
       }
 
-      if (expr.hasPartition()) {
-        if (expr.getPartition().getPartitionType().equals(PartitionType.COLUMN)) {
-          createTableNode.setPartitions(convertTableElementsPartition(context, expr));
-        } else {
-          throw new PlanningException(String.format("Not supported PartitonType: %s",
-              expr.getPartition().getPartitionType()));
-        }
-      }
-
       return createTableNode;
     }
   }
 
+  private PartitionMethodDesc getPartitionMethod(PlanContext context,
+                                                 String tableName,
+                                                 CreateTable.PartitionMethodDescExpr expr) throws PlanningException {
+    PartitionMethodDesc partitionMethodDesc = new PartitionMethodDesc();
+    partitionMethodDesc.setTableId(tableName);
+
+    if(expr.getPartitionType() == PartitionType.COLUMN) {
+      CreateTable.ColumnPartition partition = (CreateTable.ColumnPartition) expr;
+      String partitionExpression = Joiner.on(',').join(partition.getColumns());
+      partitionMethodDesc.setPartitionType(CatalogProtos.PartitionType.COLUMN);
+      partitionMethodDesc.setExpression(partitionExpression);
+      partitionMethodDesc.setExpressionSchema(convertColumnsToSchema(partition.getColumns()));
+    } else {
+      throw new PlanningException(String.format("Not supported PartitonType: %s",
+          expr.getPartitionType()));
+    }
+    return partitionMethodDesc;
+  }
+
   /**
-   * convert table elements into Partition.
+   * It transforms table definition elements to schema.
    *
-   * @param context
-   * @param expr
-   * @return
-   * @throws PlanningException
+   * @param elements to be transformed
+   * @return schema transformed from table definition elements
    */
-  private PartitionDesc convertTableElementsPartition(PlanContext context,
-                                                      CreateTable expr) throws PlanningException {
-    Schema schema = convertTableElementsSchema(expr.getTableElements());
-    PartitionDesc partitionDesc = null;
-    List<Specifier> specifiers = null;
-    if (expr.hasPartition()) {
-      partitionDesc = new PartitionDesc();
-      specifiers = TUtil.newList();
-
-      partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
-          .getPartitionType().name()));
-
-      if (expr.getPartition().getPartitionType().equals(PartitionType.HASH)) {
-        CreateTable.HashPartition hashPartition = expr.getPartition();
-
-        partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
-            , hashPartition.getColumns()));
-
-        if (hashPartition.getColumns() != null) {
-          if (hashPartition.getQuantifier() != null) {
-            String quantity = ((LiteralValue)hashPartition.getQuantifier()).getValue();
-            partitionDesc.setNumPartitions(Integer.parseInt(quantity));
-          }
-
-          if (hashPartition.getSpecifiers() != null) {
-            for(CreateTable.PartitionSpecifier eachSpec: hashPartition.getSpecifiers()) {
-              specifiers.add(new Specifier(eachSpec.getName()));
-            }
-          }
-
-          if (specifiers.isEmpty() && partitionDesc.getNumPartitions() > 0) {
-            for (int i = 0; i < partitionDesc.getNumPartitions(); i++) {
-              String partitionName = partitionDesc.getPartitionsType().name() + "_" + expr
-                  .getTableName() + "_" + i;
-              specifiers.add(new Specifier(partitionName));
-            }
-          }
-
-          if (!specifiers.isEmpty())
-            partitionDesc.setSpecifiers(specifiers);
-        }
-      } else if (expr.getPartition().getPartitionType().equals(PartitionType.LIST)) {
-        CreateTable.ListPartition listPartition = expr.getPartition();
-
-        partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
-            , listPartition.getColumns()));
-
-        if (listPartition.getSpecifiers() != null) {
-          StringBuffer sb = new StringBuffer();
-
-          for(CreateTable.ListPartitionSpecifier eachSpec: listPartition.getSpecifiers()) {
-            Specifier specifier = new Specifier(eachSpec.getName());
-            sb.delete(0, sb.length());
-            for(Expr eachExpr : eachSpec.getValueList().getValues()) {
-              context.queryBlock.setSchema(schema);
-              EvalNode eval = exprAnnotator.createEvalNode(context.plan, context.queryBlock, eachExpr);
-              if(sb.length() > 1)
-                sb.append(",");
-
-              sb.append(eval.toString());
-            }
-            specifier.setExpressions(sb.toString());
-            specifiers.add(specifier);
-          }
-          if (!specifiers.isEmpty())
-            partitionDesc.setSpecifiers(specifiers);
-        }
-      } else if (expr.getPartition().getPartitionType().equals(PartitionType.RANGE)) {
-        CreateTable.RangePartition rangePartition = expr.getPartition();
-
-        partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
-            , rangePartition.getColumns()));
-
-        if (rangePartition.getSpecifiers() != null) {
-          for(CreateTable.RangePartitionSpecifier eachSpec: rangePartition.getSpecifiers()) {
-            Specifier specifier = new Specifier();
-
-            if (eachSpec.getName() != null)
-              specifier.setName(eachSpec.getName());
-
-            if (eachSpec.getEnd() != null) {
-              context.queryBlock.setSchema(schema);
-              EvalNode eval = exprAnnotator.createEvalNode(context.plan, context.queryBlock, eachSpec.getEnd());
-              specifier.setExpressions(eval.toString());
-            }
+  private Schema convertColumnsToSchema(CreateTable.ColumnDefinition[] elements) {
+    Schema schema = new Schema();
 
-            if(eachSpec.isEndMaxValue()) {
-              specifier.setExpressions(null);
-            }
-            specifiers.add(specifier);
-          }
-          if (!specifiers.isEmpty())
-            partitionDesc.setSpecifiers(specifiers);
-        }
-      } else if (expr.getPartition().getPartitionType() == PartitionType.COLUMN) {
-        ColumnPartition columnPartition = expr.getPartition();
-        partitionDesc.setColumns(convertTableElementsSchema(columnPartition.getColumns()).getColumns());
-        partitionDesc.setOmitValues(columnPartition.isOmitValues());
-      }
+    for (CreateTable.ColumnDefinition columnDefinition: elements) {
+      schema.addColumn(convertColumn(columnDefinition));
     }
 
-    return partitionDesc;
+    return schema;
   }
 
-
   /**
    * It transforms table definition elements to schema.
    *
@@ -1487,32 +1396,6 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return schema;
   }
 
-  private ColumnDefinition[] convertSchemaToTableElements(Schema schema) {
-    List<Column> columns = schema.getColumns();
-    ColumnDefinition[] columnDefinitions = new ColumnDefinition[columns.size()];
-    for(int i = 0; i < columns.size(); i ++) {
-      Column col = columns.get(i);
-      columnDefinitions[i] = new ColumnDefinition(col.getColumnName(), col.getDataType().getType().name());
-    }
-
-    return columnDefinitions;
-  }
-
-  private Collection<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
-                                                         ColumnReferenceExpr[] references) {
-    List<Column> columnList = TUtil.newList();
-
-    for(CreateTable.ColumnDefinition columnDefinition: elements) {
-      for(ColumnReferenceExpr eachReference: references) {
-        if (columnDefinition.getColumnName().equalsIgnoreCase(eachReference.getName())) {
-          columnList.add(convertColumn(columnDefinition));
-        }
-      }
-    }
-
-    return columnList;
-  }
-
   private Column convertColumn(ColumnDefinition columnDefinition) {
     return new Column(columnDefinition.getColumnName(), convertDataType(columnDefinition));
   }
@@ -1541,7 +1424,25 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   ===============================================================================================*/
 
   public static boolean checkIfBeEvaluatedAtGroupBy(EvalNode evalNode, GroupbyNode groupbyNode) {
-    return checkIfBeEvaluateAtThis(evalNode, groupbyNode) && evalNode.getType() == EvalType.AGG_FUNCTION;
+    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
+
+    if (!groupbyNode.getInSchema().containsAll(columnRefs)) {
+      return false;
+    }
+
+    Set<String> tableIds = Sets.newHashSet();
+    // getting distinct table references
+    for (Column col : columnRefs) {
+      if (!tableIds.contains(col.getQualifier())) {
+        tableIds.add(col.getQualifier());
+      }
+    }
+
+    if (tableIds.size() > 1) {
+      return false;
+    }
+
+    return true;
   }
 
   public static boolean checkIfBeEvaluatedAtJoin(QueryBlock block, EvalNode evalNode, JoinNode joinNode,
@@ -1560,7 +1461,11 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     // at the topmost join operator.
     // TODO - It's also valid that case-when is evalauted at the topmost outer operator.
     //        But, how can we know there is no further outer join operator after this node?
-    return checkCaseWhenWithOuterJoin(block, evalNode, isTopMostJoin);
+    if (checkCaseWhenWithOuterJoin(block, evalNode, isTopMostJoin)) {
+      return true;
+    } else {
+      return false;
+    }
   }
 
   private static boolean checkCaseWhenWithOuterJoin(QueryBlock block, EvalNode evalNode, boolean isTopMostJoin) {
@@ -1580,7 +1485,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       return false;
     }
 
-    if (node.getInSchema().containsAll(columnRefs)) {
+    if (node.getTableSchema().containsAll(columnRefs)) {
       // Why? - When a {case when} is used with outer join, case when must be evaluated at topmost outer join.
       if (block.containsJoinType(JoinType.LEFT_OUTER) || block.containsJoinType(JoinType.RIGHT_OUTER)) {
         Collection<CaseWhenEval> found = EvalTreeUtil.findEvalsByType(evalNode, EvalType.CASE);
@@ -1596,6 +1501,9 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
   public static boolean checkIfBeEvaluateAtThis(EvalNode evalNode, LogicalNode node) {
     Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
-    return node.getInSchema().containsAll(columnRefs);
+    if (!node.getInSchema().containsAll(columnRefs)) {
+      return false;
+    }
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 7de053b..202c59d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -91,7 +91,6 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
                                            PhysicalExec execPlan) throws IOException {
     DataChannel channel = context.getDataChannel();
     ShuffleFileWriteNode shuffleFileWriteNode = new ShuffleFileWriteNode(UNGENERATED_PID);
-    shuffleFileWriteNode.setTableName(channel.getTargetId().toString());
     shuffleFileWriteNode.setStorageType(context.getDataChannel().getStoreType());
     shuffleFileWriteNode.setInSchema(plan.getOutSchema());
     shuffleFileWriteNode.setOutSchema(plan.getOutSchema());
@@ -675,12 +674,12 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   public PhysicalExec createStorePlan(TaskAttemptContext ctx,
                                       StoreTableNode plan, PhysicalExec subOp) throws IOException {
 
-    if (plan.getPartitions() != null) {
-      switch (plan.getPartitions().getPartitionsType()) {
+    if (plan.getPartitionMethod() != null) {
+      switch (plan.getPartitionMethod().getPartitionType()) {
       case COLUMN:
         return new ColumnPartitionedTableStoreExec(ctx, plan, subOp);
       default:
-        throw new IllegalStateException(plan.getPartitions().getPartitionsType() + " is not supported yet.");
+        throw new IllegalStateException(plan.getPartitionMethod().getPartitionType() + " is not supported yet.");
       }
     } else {
       return new StoreTableExec(ctx, plan, subOp);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 663954e..ef4e60a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -29,7 +29,7 @@ import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
@@ -644,7 +644,7 @@ public class PlannerUtil {
   }
 
   public static Schema rewriteColumnPartitionedTableSchema(
-                               PartitionDesc partitionDesc,
+                               PartitionMethodDesc partitionDesc,
                                Schema columnPartitionSchema,
                                Schema sourceSchema,
                                String qualifier) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 3e756d5..6e44bb8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -27,7 +27,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
@@ -433,10 +433,10 @@ public class GlobalPlanner {
                                         ExecutionBlock childBlock,
                                         StoreTableNode currentNode) 
     throws PlanningException {
-    PartitionDesc partitionDesc = currentNode.getPartitions();
+    PartitionMethodDesc partitionMethod = currentNode.getPartitionMethod();
 
     // if result table is not a partitioned table, directly store it
-    if(partitionDesc == null) {
+    if(partitionMethod == null) {
 
       if (childBlock.getPlan() == null) { // when the below is union
         for (ExecutionBlock grandChildBlock : context.plan.getChilds(childBlock)) {
@@ -463,24 +463,22 @@ public class GlobalPlanner {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
     DataChannel channel;
-    CatalogProtos.PartitionsType partitionsType = partitionDesc.getPartitionsType();
+    CatalogProtos.PartitionType partitionsType = partitionMethod.getPartitionType();
 
-    if(partitionsType == CatalogProtos.PartitionsType.COLUMN) {
+    if(partitionsType == CatalogProtos.PartitionType.COLUMN) {
       channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
-      Column[] columns = new Column[partitionDesc.getColumns().size()];
-
       if (currentNode.getType() == NodeType.INSERT) {
         InsertNode insertNode = (InsertNode) currentNode;
         channel.setSchema(((InsertNode)currentNode).getProjectedSchema());
-        Column [] shuffleKeys = new Column[partitionDesc.getColumns().size()];
+        Column [] shuffleKeys = new Column[partitionMethod.getExpressionSchema().getColumnNum()];
         int i = 0;
-        for (Column column : partitionDesc.getColumns()) {
+        for (Column column : partitionMethod.getExpressionSchema().getColumns()) {
           int id = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
           shuffleKeys[i++] = insertNode.getProjectedSchema().getColumn(id);
         }
         channel.setShuffleKeys(shuffleKeys);
       } else {
-        channel.setShuffleKeys(partitionDesc.getColumns().toArray(columns));
+        channel.setShuffleKeys(partitionMethod.getExpressionSchema().toArray());
       }
       channel.setSchema(childNode.getOutSchema());
       channel.setStoreType(storeType);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index 66a2355..c266bbc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -34,14 +34,24 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
     super(pid, NodeType.CREATE_TABLE);
   }
 
-  public void setSchema(Schema schema) {
+  public void setTableSchema(Schema schema) {
     this.schema = schema;
   }
     
-  public Schema getSchema() {
+  public Schema getTableSchema() {
     return this.schema;
   }
 
+  public Schema getLogicalSchema() {
+    if (hasPartition()) {
+      Schema logicalSchema = new Schema(schema);
+      logicalSchema.addColumns(getPartitionMethod().getExpressionSchema());
+      return logicalSchema;
+    } else {
+      return schema;
+    }
+  }
+
   public boolean hasPath() {
     return this.path != null;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
index d687829..824e858 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
@@ -35,7 +35,7 @@ public class PartitionedTableScanNode extends ScanNode {
     this.setInSchema(scanNode.getInSchema());
     this.setOutSchema(scanNode.getOutSchema());
     this.alias = scanNode.alias;
-    this.renamedSchema = scanNode.renamedSchema;
+    this.logicalSchema = scanNode.logicalSchema;
     this.qual = scanNode.qual;
     this.targets = scanNode.targets;
     this.inputPaths = inputPaths;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
index 1bd3548..2a357c4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PersistentStoreNode.java
@@ -32,7 +32,6 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
  * This includes some basic information for materializing data.
  */
 public abstract class PersistentStoreNode extends UnaryNode implements Cloneable {
-  @Expose protected String tableName;
   @Expose protected StoreType storageType = StoreType.CSV;
   @Expose protected Options options;
 
@@ -40,18 +39,6 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
     super(pid, nodeType);
   }
 
-  public boolean hasTargetTable() {
-    return tableName != null;
-  }
-
-  public void setTableName(String tableName) {
-    this.tableName = tableName;
-  }
-
-  public final String getTableName() {
-    return this.tableName;
-  }
-
   public void setStorageType(StoreType storageType) {
     this.storageType = storageType;
   }
@@ -75,7 +62,6 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
   @Override
   public PlanString getPlanString() {
     PlanString planStr = new PlanString("Store");
-    planStr.appendTitle(" into ").appendTitle(tableName);
     planStr.addExplan("Store type: " + storageType);
 
     return planStr;
@@ -86,7 +72,6 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
     if (obj instanceof PersistentStoreNode) {
       PersistentStoreNode other = (PersistentStoreNode) obj;
       boolean eq = super.equals(other);
-      eq = eq && this.tableName.equals(other.tableName);
       eq = eq && this.storageType.equals(other.storageType);
       eq = eq && TUtil.checkEquals(options, other.options);
       return eq;
@@ -98,25 +83,8 @@ public abstract class PersistentStoreNode extends UnaryNode implements Cloneable
   @Override
   public Object clone() throws CloneNotSupportedException {
     PersistentStoreNode store = (PersistentStoreNode) super.clone();
-    store.tableName = tableName;
     store.storageType = storageType != null ? storageType : null;
     store.options = options != null ? (Options) options.clone() : null;
     return store;
   }
-
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("\"Store\": {\"table\": \""+tableName);
-    if (storageType != null) {
-      sb.append(", storage: "+ storageType.name());
-    }
-
-    sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
-        .append("\n  \"in schema\": ").append(getInSchema());
-
-    sb.append("}");
-
-    return sb.toString() + "\n"
-        + getChild().toString();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 165e1e8..155c09f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -26,12 +26,13 @@ import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.util.TUtil;
 
 public class ScanNode extends RelationNode implements Projectable {
 	@Expose protected TableDesc tableDesc;
   @Expose protected String alias;
-  @Expose protected Schema renamedSchema;
+  @Expose protected Schema logicalSchema;
 	@Expose protected EvalNode qual;
 	@Expose protected Target[] targets;
 
@@ -45,13 +46,16 @@ public class ScanNode extends RelationNode implements Projectable {
     this.tableDesc = desc;
     this.setInSchema(tableDesc.getSchema());
     this.setOutSchema(tableDesc.getSchema());
+    logicalSchema = SchemaUtil.getQualifiedLogicalSchema(tableDesc, null);
   }
   
 	public ScanNode(int pid, TableDesc desc, String alias) {
     this(pid, desc);
     this.alias = PlannerUtil.normalizeTableName(alias);
-    renamedSchema = getOutSchema();
-    renamedSchema.setQualifier(this.alias);
+    this.setInSchema(tableDesc.getSchema());
+    this.getInSchema().setQualifier(alias);
+    this.setOutSchema(new Schema(getInSchema()));
+    logicalSchema = SchemaUtil.getQualifiedLogicalSchema(tableDesc, alias);
 	}
 	
 	public String getTableName() {
@@ -67,7 +71,11 @@ public class ScanNode extends RelationNode implements Projectable {
   }
 
   public Schema getTableSchema() {
-    return hasAlias() ? renamedSchema : tableDesc.getSchema();
+    return logicalSchema;
+  }
+
+  public Schema getPhysicalSchema() {
+    return getInSchema();
   }
 	
 	public boolean hasQual() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
index 3abec9d..e4ef49b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ShuffleFileWriteNode.java
@@ -88,9 +88,9 @@ public class ShuffleFileWriteNode extends PersistentStoreNode implements Cloneab
 
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append("\"Store\": {\"table\": \""+tableName);
+    sb.append("\"Store\":");
     if (storageType != null) {
-      sb.append(", storage: "+ storageType.name());
+      sb.append(" storage: "+ storageType.name());
     }
     sb.append(", partnum: ").append(numOutputs).append("}")
     .append(", ");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index 2aca6a7..47e33c7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -19,12 +19,13 @@
 package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
 public class StoreTableNode extends PersistentStoreNode implements Cloneable {
-  @Expose private PartitionDesc partitionDesc;
+  @Expose protected String tableName;
+  @Expose private PartitionMethodDesc partitionDesc;
 
   public StoreTableNode(int pid) {
     super(pid, NodeType.STORE);
@@ -34,15 +35,27 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
     super(pid, nodeType);
   }
 
+  public boolean hasTargetTable() {
+    return tableName != null;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public final String getTableName() {
+    return this.tableName;
+  }
+
   public boolean hasPartition() {
     return this.partitionDesc != null;
   }
 
-  public PartitionDesc getPartitions() {
+  public PartitionMethodDesc getPartitionMethod() {
     return partitionDesc;
   }
 
-  public void setPartitions(PartitionDesc partitionDesc) {
+  public void setPartitionMethod(PartitionMethodDesc partitionDesc) {
     this.partitionDesc = partitionDesc;
   }
 
@@ -60,6 +73,7 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
     if (obj instanceof StoreTableNode) {
       StoreTableNode other = (StoreTableNode) obj;
       boolean eq = super.equals(other);
+      eq = eq && this.tableName.equals(other.tableName);
       eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
       return eq;
     } else {
@@ -70,7 +84,8 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
   @Override
   public Object clone() throws CloneNotSupportedException {
     StoreTableNode store = (StoreTableNode) super.clone();
-    store.partitionDesc = partitionDesc;
+    store.tableName = tableName;
+    store.partitionDesc = partitionDesc != null ? (PartitionMethodDesc) partitionDesc.clone() : null;
     return store;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
index 338d2f2..cee9bba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
@@ -28,16 +28,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.planner.InsertNode;
+import org.apache.tajo.engine.planner.logical.CreateTableNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.storage.StorageUtil;
@@ -50,8 +48,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
-
 /**
  * This class is a physical operator to store at column partitioned table.
  */
@@ -71,6 +67,10 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
 
+    if (plan.getType() == NodeType.CREATE_TABLE) {
+      this.outSchema = ((CreateTableNode)plan).getTableSchema();
+    }
+
     // set table meta
     if (this.plan.hasOptions()) {
       meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
@@ -78,24 +78,22 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
       meta = CatalogUtil.newTableMeta(plan.getStorageType());
     }
 
-    // Rewrite a output schema because we don't have to store field values
-    // corresponding to partition key columns.
-    if (plan.getPartitions() != null && plan.getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
-      rewriteColumnPartitionedTableSchema();
-    }
-
     // Find column index to name subpartition directory path
-    int partitionKeyNum = this.plan.getPartitions().getColumns().size();
+    int partitionKeyNum = this.plan.getPartitionMethod().getExpressionSchema().getColumnNum();
     partitionColumnIndices = new int[partitionKeyNum];
     partitionColumnNames = new String[partitionKeyNum];
     for (int i = 0; i < partitionKeyNum; i++) {
-      Column column = this.plan.getPartitions().getColumns().get(i);
+      Column column = this.plan.getPartitionMethod().getExpressionSchema().getColumn(i);
       partitionColumnNames[i] = column.getColumnName();
 
       if (this.plan.getType() == NodeType.INSERT) {
         InsertNode insertNode = ((InsertNode)plan);
         int idx = insertNode.getTableSchema().getColumnId(column.getQualifiedName());
         partitionColumnIndices[i] = idx;
+      } else if (this.plan.getType() == NodeType.CREATE_TABLE) {
+        CreateTableNode createTable = (CreateTableNode) plan;
+        int idx = createTable.getLogicalSchema().getColumnId(column.getQualifiedName());
+        partitionColumnIndices[i] = idx;
       } else {
         // We can get partition column from a logical schema.
         // Don't use output schema because it is rewritten.
@@ -104,23 +102,6 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
     }
   }
 
-  /**
-   * This method rewrites an input schema of column-partitioned table because
-   * there are no actual field values in data file in a column-partitioned table.
-   * So, this method removes partition key columns from the input schema.
-   */
-  private void rewriteColumnPartitionedTableSchema() {
-    PartitionDesc partitionDesc = plan.getPartitions();
-    Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
-    String qualifier = plan.getTableName();
-
-    outSchema = PlannerUtil.rewriteColumnPartitionedTableSchema(
-                                             partitionDesc,
-                                             columnPartitionSchema,
-                                             outSchema,
-                                             qualifier);
-  }
-
   public void init() throws IOException {
     super.init();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index d533b82..a4153dc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -44,7 +44,7 @@ public class HashAggregateExec extends AggregationExec {
     hashTable = new HashMap<Tuple, FunctionContext []>(100000);
     this.tuple = new VTuple(plan.getOutSchema().getColumnNum());
   }
-  
+
   private void compute() throws IOException {
     Tuple tuple;
     Tuple keyTuple;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index c38be92..31a944c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -20,14 +20,13 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.eval.ConstEval;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.eval.FieldEval;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.ScanNode;
@@ -42,7 +41,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
 
 public class SeqScanExec extends PhysicalExec {
   private final ScanNode plan;
@@ -73,17 +71,14 @@ public class SeqScanExec extends PhysicalExec {
    * indicate partition keys. In this time, it is right. Later, we have to fix it.
    */
   private void rewriteColumnPartitionedTableSchema() throws IOException {
-    PartitionDesc partitionDesc = plan.getTableDesc().getPartitions();
-    Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
+    PartitionMethodDesc partitionDesc = plan.getTableDesc().getPartitionMethod();
+    Schema columnPartitionSchema = (Schema) partitionDesc.getExpressionSchema().clone();
     String qualifier = inSchema.getColumn(0).getQualifier();
     columnPartitionSchema.setQualifier(qualifier);
 
     // Remove partition key columns from an input schema.
-    this.inSchema = PlannerUtil.rewriteColumnPartitionedTableSchema(
-                                                 partitionDesc,
-                                                 columnPartitionSchema,
-                                                 inSchema,
-                                                 qualifier);
+    this.inSchema = plan.getTableDesc().getSchema();
+
 
     List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
 
@@ -97,6 +92,7 @@ public class SeqScanExec extends PhysicalExec {
       FieldEval targetExpr = new FieldEval(column);
       Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow);
       ConstEval constExpr = new ConstEval(datum);
+
       for (Target target : plan.getTargets()) {
         if (target.getEvalTree().equals(targetExpr)) {
           if (!target.hasAlias()) {
@@ -117,8 +113,8 @@ public class SeqScanExec extends PhysicalExec {
   public void init() throws IOException {
     Schema projected;
 
-    if (plan.getTableDesc().hasPartitions()
-        && plan.getTableDesc().getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
+    if (plan.getTableDesc().hasPartition()
+        && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
       rewriteColumnPartitionedTableSchema();
     }
 
@@ -146,12 +142,13 @@ public class SeqScanExec extends PhysicalExec {
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
 
     if (fragments.length > 1) {
-      this.scanner = new MergeScanner(context.getConf(), plan.getTableSchema(), plan.getTableDesc().getMeta(),
+      this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), plan.getTableDesc().getMeta(),
           FragmentConvertor.<FileFragment>convert(context.getConf(), plan.getTableDesc().getMeta().getStoreType(),
               fragments), projected);
     } else {
       this.scanner = StorageManagerFactory.getStorageManager(
-          context.getConf()).getScanner(plan.getTableDesc().getMeta(), plan.getTableSchema(), fragments[0], projected);
+          context.getConf()).getScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragments[0],
+          projected);
     }
 
     scanner.init();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
index 041220a..a1cbe76 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.*;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
@@ -65,7 +65,7 @@ public class PartitionedTableRewriter implements RewriteRule {
       for (RelationNode relation : block.getRelations()) {
         if (relation.getType() == NodeType.SCAN) {
           TableDesc table = ((ScanNode)relation).getTableDesc();
-          if (table.hasPartitions()) {
+          if (table.hasPartition()) {
             return true;
           }
         }
@@ -82,7 +82,7 @@ public class PartitionedTableRewriter implements RewriteRule {
       for (RelationNode relation : block.getRelations()) {
         if (relation.getType() == NodeType.SCAN) {
           TableDesc table = ((ScanNode)relation).getTableDesc();
-          if (table.hasPartitions()) {
+          if (table.hasPartition()) {
             containsPartitionedTables = true;
           }
         }
@@ -237,10 +237,10 @@ public class PartitionedTableRewriter implements RewriteRule {
     FileSystem fs = table.getPath().getFileSystem(systemConf);
     LOG.info("Partitioned Table Dir: " + table.getPath());
     LOG.info("Summary: " + fs.getContentSummary(table.getPath()).getDirectoryCount());
-    PartitionDesc partitionDesc = scanNode.getTableDesc().getPartitions();
+    PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod();
 
     Schema paritionValuesSchema = new Schema();
-    for (Column column : partitionDesc.getColumns()) {
+    for (Column column : partitionDesc.getExpressionSchema().getColumns()) {
       paritionValuesSchema.addColumn(column);
     }
 
@@ -349,7 +349,7 @@ public class PartitionedTableRewriter implements RewriteRule {
                             Stack<LogicalNode> stack) throws PlanningException {
 
       TableDesc table = scanNode.getTableDesc();
-      if (!table.hasPartitions()) {
+      if (!table.hasPartition()) {
         return null;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index e70e6c2..4e60448 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -690,7 +690,7 @@ public class ProjectionPushDownRule extends
     if (node.hasTargets()) {
       targets = node.getTargets();
     } else {
-      targets = PlannerUtil.schemaToTargets(node.getOutSchema());
+      targets = PlannerUtil.schemaToTargets(node.getTableSchema());
     }
 
     List<Target> projectedTargets = TUtil.newList();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/eb563add/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
index e710f9d..4a3bf46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/QueryContext.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.query;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.logical.NodeType;
@@ -130,16 +130,16 @@ public class QueryContext extends Options {
     return strVal != null ? new Path(strVal) : null;
   }
 
-  public boolean hasPartitions() {
+  public boolean hasPartition() {
     return get(OUTPUT_PARTITIONS) != null;
   }
 
-  public void setPartitions(PartitionDesc partitionDesc) {
-    put(OUTPUT_PARTITIONS, partitionDesc != null ? partitionDesc.toJson() : null);
+  public void setPartitionMethod(PartitionMethodDesc partitionMethodDesc) {
+    put(OUTPUT_PARTITIONS, partitionMethodDesc != null ? partitionMethodDesc.toJson() : null);
   }
 
-  public PartitionDesc getPartitions() {
-    return PartitionDesc.fromJson(get(OUTPUT_PARTITIONS));
+  public PartitionMethodDesc getPartitionMethod() {
+    return PartitionMethodDesc.fromJson(get(OUTPUT_PARTITIONS));
   }
 
   public void setOutputOverwrite() {