You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/12/19 04:10:33 UTC

[1/3] TAJO-338 - Add Query Optimization Part for Column-Partitioned Tables. (hyunsik)

Updated Branches:
  refs/heads/master 4dbecf901 -> f58f6ee82


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index ed529a9..e7eaf40 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -19,26 +19,34 @@
 package org.apache.tajo.engine.utils;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.dataserver.HttpUtil;
 
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -401,4 +409,102 @@ public class TupleUtil {
     }
     return aTuple;
   }
+
+  @SuppressWarnings("unused")
+  public static Collection<Tuple> filterTuple(Schema schema, Collection<Tuple> tupleBlock, EvalNode filterCondition) {
+    TupleBlockFilterScanner filter = new TupleBlockFilterScanner(schema, tupleBlock, filterCondition);
+    return filter.nextBlock();
+  }
+
+  private static class TupleBlockFilterScanner {
+    private EvalNode qual;
+    private EvalContext qualCtx;
+    private Iterator<Tuple> iterator;
+    private Schema schema;
+
+    public TupleBlockFilterScanner(Schema schema, Collection<Tuple> tuples, EvalNode qual) {
+      this.schema = schema;
+      this.qual = qual;
+      this.qualCtx = qual.newContext();
+      this.iterator = tuples.iterator();
+    }
+
+    public List<Tuple> nextBlock() {
+      List<Tuple> results = Lists.newArrayList();
+
+      Tuple tuple;
+      while (iterator.hasNext()) {
+        tuple = iterator.next();
+        qual.eval(qualCtx, schema, tuple);
+        if (qual.terminate(qualCtx).asBool()) {
+          results.add(tuple);
+        }
+      }
+      return results;
+    }
+  }
+
+  /**
+   * Take a look at a column partition path. A partition path consists
+   * of a table path part and column values part. This method transforms
+   * a partition path into a tuple with a given partition column schema.
+   *
+   * hdfs://192.168.0.1/tajo/warehouse/table1/col1=abc/col2=def/col3=ghi
+   *                   ^^^^^^^^^^^^^^^^^^^^^  ^^^^^^^^^^^^^^^^^^^^^^^^^^
+   *                      table path part        column values part
+   *
+   * When a file path is given, it can perform two ways depending on beNullIfFile flag.
+   * If it is true, it returns NULL when a given path is a file.
+   * Otherwise, it returns a built tuple regardless of file or directory.
+   *
+   * @param partitionColumnSchema The partition column schema
+   * @param partitionPath The partition path
+   * @param beNullIfFile If true, this method returns NULL when a given path is a file.
+   * @return The tuple transformed from a column values part.
+   */
+  public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Path partitionPath,
+                                                  boolean beNullIfFile) {
+    int startIdx = partitionPath.toString().indexOf(getColumnPartitionPathPrefix(partitionColumnSchema));
+
+    if (startIdx == -1) { // if there is no partition column in the patch
+      return null;
+    }
+    String columnValuesPart = partitionPath.toString().substring(startIdx);
+
+    String [] columnValues = columnValuesPart.split("/");
+
+    // true means this is a file.
+    if (beNullIfFile && partitionColumnSchema.getColumnNum() < columnValues.length) {
+      return null;
+    }
+
+    Tuple tuple = new VTuple(partitionColumnSchema.getColumnNum());
+    int i = 0;
+    for (; i < columnValues.length && i < partitionColumnSchema.getColumnNum(); i++) {
+      String [] parts = columnValues[i].split("=");
+      if (parts.length != 2) {
+        return null;
+      }
+      int columnId = partitionColumnSchema.getColumnIdByName(parts[0]);
+      Column keyColumn = partitionColumnSchema.getColumn(columnId);
+      tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), parts[1]));
+    }
+    for (; i < partitionColumnSchema.getColumnNum(); i++) {
+      tuple.put(i, NullDatum.get());
+    }
+    return tuple;
+  }
+
+  /**
+   * Get a prefix of column partition path. For example, consider a column partition (col1, col2).
+   * Then, you will get a string 'col1='.
+   *
+   * @param partitionColumn the schema of column partition
+   * @return The first part string of column partition path.
+   */
+  private static String getColumnPartitionPathPrefix(Schema partitionColumn) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(partitionColumn.getColumn(0).getColumnName()).append("=");
+    return sb.toString();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index d432017..b1deb43 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.engine.planner.global.ExecutionBlock;
+import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryUnitRequest;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
@@ -470,7 +472,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               task.getLogicalPlan().toJson(),
               context.getQueryContext(),
               subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
-          if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
+          if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
             taskAssign.setInterQuery();
           }
 
@@ -490,6 +492,19 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       LOG.debug("RackLocalAssigned: " + rackLocalAssigned + " / " + totalAssigned);
     }
 
+    private boolean checkIfInterQuery(MasterPlan masterPlan, ExecutionBlock block) {
+      if (masterPlan.isRoot(block)) {
+        return false;
+      }
+
+      ExecutionBlock parent = masterPlan.getParent(block);
+      if (masterPlan.isRoot(parent) && parent.hasUnion()) {
+        return false;
+      }
+
+      return true;
+    }
+
     public void assignToNonLeafTasks(List<TaskRequestEvent> taskRequests) {
       Iterator<TaskRequestEvent> it = taskRequests.iterator();
 
@@ -519,7 +534,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
               context.getQueryContext(),
               subQuery.getDataChannel(),
               subQuery.getBlock().getEnforcer());
-          if (!subQuery.getMasterPlan().isRoot(subQuery.getBlock())) {
+          if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
             taskAssign.setInterQuery();
           }
           for (ScanNode scan : task.getScanNodes()) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 10f42c5..4575bf8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -32,7 +32,7 @@ import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.AlreadyExistsTableException;
 import org.apache.tajo.catalog.exception.NoSuchTableException;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
@@ -86,7 +86,7 @@ public class GlobalEngine extends AbstractService {
       analyzer = new SQLAnalyzer();
       converter = new HiveConverter();
       planner = new LogicalPlanner(context.getCatalog());
-      optimizer = new LogicalOptimizer();
+      optimizer = new LogicalOptimizer(context.getConf());
       verifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());
 
       hookManager = new DistributedQueryHookManager();
@@ -234,7 +234,7 @@ public class GlobalEngine extends AbstractService {
     if (!state.verified()) {
       StringBuilder sb = new StringBuilder();
       for (String error : state.getErrorMessages()) {
-        sb.append("ERROR: ").append(error).append("\n");
+        sb.append(error).append("\n");
       }
       throw new VerifyException(sb.toString());
     }
@@ -263,7 +263,7 @@ public class GlobalEngine extends AbstractService {
   }
 
   public TableDesc createTableOnPath(String tableName, Schema schema, TableMeta meta,
-                                     Path path, boolean isCreated, Partitions partitions)
+                                     Path path, boolean isCreated, PartitionDesc partitionDesc)
       throws IOException {
     if (catalog.existsTable(tableName)) {
       throw new AlreadyExistsTableException(tableName);
@@ -291,7 +291,9 @@ public class GlobalEngine extends AbstractService {
     stats.setNumBytes(totalSize);
     TableDesc desc = CatalogUtil.newTableDesc(tableName, schema, meta, path);
     desc.setStats(stats);
-    desc.setPartitions(partitions);
+    if (partitionDesc != null) {
+      desc.setPartitions(partitionDesc);
+    }
     catalog.addTable(desc);
 
     LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 863ed18..861147a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -35,8 +35,7 @@ import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.exception.NoSuchTableException;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.ipc.ClientProtos;
@@ -316,12 +315,15 @@ public class TajoMasterClientService extends AbstractService {
 
         Schema schema = new Schema(request.getSchema());
         TableMeta meta = new TableMeta(request.getMeta());
-        Partitions partitions = new Partitions(request.getPartitions());
+        PartitionDesc partitionDesc = null;
+        if (request.hasPartitions()) {
+          partitionDesc = new PartitionDesc(request.getPartitions());
+        }
 
         TableDesc desc;
         try {
           desc = context.getGlobalEngine().createTableOnPath(request.getName(), schema,
-              meta, path, false, partitions);
+              meta, path, false, partitionDesc);
         } catch (Exception e) {
           return TableResponse.newBuilder()
               .setResultCode(ResultCode.ERROR)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 828ebfa..5d717ee 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -266,7 +266,7 @@ public class QueryMasterTask extends CompositeService {
 
     CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
     LogicalPlanner planner = new LogicalPlanner(catalog);
-    LogicalOptimizer optimizer = new LogicalOptimizer();
+    LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
     Expr expr;
     if (queryContext.isHiveQueryMode()) {
       HiveConverter hiveConverter = new HiveConverter();
@@ -297,6 +297,14 @@ public class QueryMasterTask extends CompositeService {
             tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
           }
         }
+
+        scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.PARTITIONS_SCAN);
+        if(scanNodes != null) {
+          for(LogicalNode eachScanNode: scanNodes) {
+            ScanNode scanNode = (ScanNode)eachScanNode;
+            tableDescMap.put(scanNode.getCanonicalName(), scanNode.getTableDesc());
+          }
+        }
       }
 
       MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 7fedd4f..03659c3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -43,10 +43,7 @@ import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
+import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.AbstractTaskScheduler;
 import org.apache.tajo.master.TaskRunnerGroupEvent;
@@ -745,21 +742,42 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       subQuery.eventHandler.handle(event);
     }
 
+    /**
+     * It creates a number of fragments for all partitions.
+     */
+    private static Collection<FileFragment> getFragmentsFromPartitionedTable(SubQuery subQuery,
+                                                                      ScanNode scan,
+                                                                      TableDesc table) throws IOException {
+      List<FileFragment> fragments = Lists.newArrayList();
+      PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
+      for (Path path : partitionsScan.getInputPaths()) {
+        fragments.addAll(subQuery.getStorageManager().getSplits(
+            scan.getCanonicalName(), table.getMeta(), table.getSchema(), path));
+      }
+      partitionsScan.setInputPaths(null);
+      return fragments;
+    }
+
     private static QueryUnit [] createLeafTasks(SubQuery subQuery) throws IOException {
       ExecutionBlock execBlock = subQuery.getBlock();
       ScanNode[] scans = execBlock.getScanNodes();
       Preconditions.checkArgument(scans.length == 1, "Must be Scan Query");
-      TableMeta meta;
-      Path inputPath;
-
       ScanNode scan = scans[0];
-      TableDesc desc = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
-      inputPath = desc.getPath();
-      meta = desc.getMeta();
+      TableDesc table = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
+
+      Collection<FileFragment> fragments;
+      TableMeta meta = table.getMeta();
 
-      // TODO - should be change the inner directory
-      List<FileFragment> fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
-          inputPath);
+      // Depending on scanner node's type, it creates fragments. If scan is for
+      // a partitioned table, It will creates lots fragments for all partitions.
+      // Otherwise, it creates at least one fragments for a table, which may
+      // span a number of blocks or possibly consists of a number of files.
+      if (scan.getType() == NodeType.PARTITIONS_SCAN) {
+        fragments = getFragmentsFromPartitionedTable(subQuery, scan, table);
+      } else {
+        Path inputPath = table.getPath();
+        fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath);
+      }
 
       QueryUnit queryUnit;
       List<QueryUnit> queryUnits = new ArrayList<QueryUnit>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
index d95c352..0637023 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtos.proto
@@ -124,7 +124,7 @@ message CreateTableRequest {
   required SchemaProto schema = 2;
   required TableProto meta = 3;
   required string path = 4;
-  optional PartitionsProto partitions = 5;
+  optional PartitionDescProto partitions = 5;
 }
 
 message DropTableRequest {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 8e257f2..44004d2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -380,8 +380,8 @@ public class TestTajoClient {
 
     assertFalse(client.existTable(tableName));
 
-    String sql = "create table " + tableName + " (deptname text, score int4)";
-    sql += "PARTITION BY COLUMN (deptname)";
+    String sql = "create table " + tableName + " (score int4)";
+    sql += "PARTITION BY COLUMN (deptname text)";
 
     client.updateQuery(sql);
     assertTrue(client.existTable(tableName));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index c35c786..ca1259b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -97,7 +97,7 @@ public class ExprTestBase {
     Schema inputSchema = null;
     if (schema != null) {
       inputSchema = (Schema) schema.clone();
-      inputSchema.setQualifier(tableName, true);
+      inputSchema.setQualifier(tableName);
 
       int targetIdx [] = new int[inputSchema.getColumnNum()];
       for (int i = 0; i < targetIdx.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 1850081..3c67068 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -169,6 +169,7 @@ public class TestEvalTreeUtil {
     "select score from people where 10 * 2 > score * 10", // 4
     "select score from people where score < 10 and 4 < score", // 5
     "select score from people where score < 10 and 4 < score and age > 5", // 6
+    "select score from people where (score > 1 and score < 3) or (7 < score and score < 10)", // 7
   };
   
   @Test
@@ -208,7 +209,7 @@ public class TestEvalTreeUtil {
   public final void testGetCNF() {
     // "select score from people where score < 10 and 4 < score "
     EvalNode node = getRootSelection(QUERIES[5]);
-    EvalNode [] cnf = EvalTreeUtil.getConjNormalForm(node);
+    EvalNode [] cnf = AlgebraicUtil.toConjunctiveNormalFormArray(node);
     
     Column col1 = new Column("people.score", TajoDataTypes.Type.INT4);
     
@@ -235,26 +236,37 @@ public class TestEvalTreeUtil {
   public final void testTransformCNF2Singleton() {
     // "select score from people where score < 10 and 4 < score "
     EvalNode node = getRootSelection(QUERIES[6]);
-    EvalNode [] cnf1 = EvalTreeUtil.getConjNormalForm(node);
+    EvalNode [] cnf1 = AlgebraicUtil.toConjunctiveNormalFormArray(node);
     assertEquals(3, cnf1.length);
     
-    EvalNode conj = EvalTreeUtil.transformCNF2Singleton(cnf1);
-    EvalNode [] cnf2 = EvalTreeUtil.getConjNormalForm(conj);
+    EvalNode conj = AlgebraicUtil.createSingletonExprFromCNF(cnf1);
+    EvalNode [] cnf2 = AlgebraicUtil.toConjunctiveNormalFormArray(conj);
     
     Set<EvalNode> set1 = Sets.newHashSet(cnf1);
     Set<EvalNode> set2 = Sets.newHashSet(cnf2);
     assertEquals(set1, set2);
   }
+
+  @Test
+  public final void testGetDNF() {
+    // "select score from people where score > 1 and score < 3 or score > 7 and score < 10", // 7
+    EvalNode node = getRootSelection(QUERIES[7]);
+    EvalNode [] cnf = AlgebraicUtil.toDisjunctiveNormalFormArray(node);
+    assertEquals(2, cnf.length);
+
+    assertEquals("people.score (INT4(0)) > 1 AND people.score (INT4(0)) < 3", cnf[0].toString());
+    assertEquals("7 < people.score (INT4(0)) AND people.score (INT4(0)) < 10", cnf[1].toString());
+  }
   
   @Test
   public final void testSimplify() throws PlanningException {
     Target [] targets = getRawTargets(QUERIES[0]);
-    EvalNode node = AlgebraicUtil.simplify(targets[0].getEvalTree());
+    EvalNode node = AlgebraicUtil.eliminateConstantExprs(targets[0].getEvalTree());
     EvalContext nodeCtx = node.newContext();
     assertEquals(EvalType.CONST, node.getType());
     node.eval(nodeCtx, null, null);
     assertEquals(7, node.terminate(nodeCtx).asInt4());
-    node = AlgebraicUtil.simplify(targets[1].getEvalTree());
+    node = AlgebraicUtil.eliminateConstantExprs(targets[1].getEvalTree());
     assertEquals(EvalType.CONST, node.getType());
     nodeCtx = node.newContext();
     node.eval(nodeCtx, null, null);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
index 6775aa9..389a9e5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/parser/TestSQLAnalyzer.java
@@ -24,7 +24,6 @@ import org.apache.tajo.algebra.CreateTable;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.algebra.LiteralValue;
 import org.apache.tajo.algebra.OpType;
-import org.apache.tajo.engine.parser.SQLParser.Boolean_value_expressionContext;
 import org.apache.tajo.engine.parser.SQLParser.SqlContext;
 import org.apache.tajo.util.FileUtil;
 import org.junit.Test;
@@ -321,9 +320,9 @@ public class TestSQLAnalyzer {
     assertEquals(CreateTable.PartitionType.COLUMN, createTable.getPartition().getPartitionType());
     CreateTable.ColumnPartition columnPartition = createTable.getPartition();
     assertEquals(3, columnPartition.getColumns().length);
-    assertEquals("col1", columnPartition.getColumns()[0].getCanonicalName());
-    assertEquals("col2", columnPartition.getColumns()[1].getCanonicalName());
-    assertEquals("col3", columnPartition.getColumns()[2].getCanonicalName());
+    assertEquals("col3", columnPartition.getColumns()[0].getColumnName());
+    assertEquals("col4", columnPartition.getColumns()[1].getColumnName());
+    assertEquals("col5", columnPartition.getColumns()[2].getColumnName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index 50bc8dc..92faec0 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -83,7 +83,7 @@ public class TestLogicalOptimizer {
     catalog.createFunction(funcDesc);
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(util.getConfiguration());
   }
 
   @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index 025c84b..1b38acd 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -74,7 +74,7 @@ public class TestLogicalPlan {
       catalog.addTable(d);
     }
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(util.getConfiguration());
   }
 
   public static void tearDown() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index c9499d6..a8f4631 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -143,7 +143,7 @@ public class TestBSTIndexExec {
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 57f2376..c43046f 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -122,7 +122,7 @@ public class TestHashAntiJoinExec {
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index d03f3c2..93f9c33 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -126,7 +126,7 @@ public class TestHashSemiJoinExec {
     catalog.addTable(people);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index b57ef3a..b4c66a1 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -150,7 +150,7 @@ public class TestPhysicalPlanner {
     catalog.addTable(score);
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
 
     masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index 45badd5..07f726d 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -96,7 +96,7 @@ public class TestSortExec {
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
index 629ddb5..b770de5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java
@@ -240,205 +240,4 @@ public class TestInsertQuery {
       assertTrue(codec instanceof DeflateCodec);
     }
   }
-
-  @Test
-  public final void testInsertOverwritePartitionByColumn1() throws Exception {
-    String tableName ="InsertOverwritePartitionByColumn1";
-    ResultSet res = tpch.execute("create table " + tableName +" (col1 int4, col2 int4, col3 float8) partition by column(col1) ");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName
-        + " select l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-
-    TableDesc desc = catalog.getTableDesc(tableName);
-    Path path = desc.getPath();
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.isDirectory(path));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-  }
-
-  @Test
-  public final void testInsertOverwritePartitionByColumn2() throws Exception {
-    String tableName ="InsertOverwritePartitionByColumn2";
-    ResultSet res = tpch.execute("create table " + tableName +" (col1 int4, col2 int4, col3 float8) partition by column(col1, col2) ");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName
-        + " select l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-
-    TableDesc desc = catalog.getTableDesc(tableName);
-    Path path = desc.getPath();
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.isDirectory(path));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-  }
-
-  @Test
-  public final void testInsertOverwritePartitionByColumn3() throws Exception {
-    String tableName ="InsertOverwritePartitionByColumn3";
-    ResultSet res = tpch.execute("create table " + tableName +" (col1 int4, col2 int4, col3 float8) partition by column(col1, col2, col3) ");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName
-        + " select l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-
-    TableDesc desc = catalog.getTableDesc(tableName);
-    Path path = desc.getPath();
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.isDirectory(path));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-
-  }
-  @Test
-  public final void testInsertOverwritePartitionByColumnWithCompression1() throws Exception {
-    String tableName = "testInsertOverwritePartitionByColumnWithCompression1";
-    ResultSet res = tpch.execute("create table " + tableName + " (col1 int4, col2 int4, col3 float8) USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') partition by column(col1)");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName + " select  l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-    TableDesc desc = catalog.getTableDesc(tableName);
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.exists(desc.getPath()));
-    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
-
-    Path path = desc.getPath();
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-
-    for (FileStatus partition : fs.listStatus(path)){
-      assertTrue(fs.isDirectory(partition.getPath()));
-      for (FileStatus file : fs.listStatus(partition.getPath())) {
-        CompressionCodec codec = factory.getCodec(file.getPath());
-        assertTrue(codec instanceof DeflateCodec);
-      }
-    }
-  }
-
-  @Test
-  public final void testInsertOverwritePartitionByColumnWithCompression2() throws Exception {
-    String tableName = "testInsertOverwritePartitionByColumnWithCompression2";
-    ResultSet res = tpch.execute("create table " + tableName + " (col1 int4, col2 int4, col3 float8) USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') partition by column(col1, col2)");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName + " select  l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-    TableDesc desc = catalog.getTableDesc(tableName);
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.exists(desc.getPath()));
-    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
-
-    Path path = desc.getPath();
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-
-    for (FileStatus partition1 : fs.listStatus(path)){
-      assertTrue(fs.isDirectory(partition1.getPath()));
-      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
-        assertTrue(fs.isDirectory(partition2.getPath()));
-        for (FileStatus file : fs.listStatus(partition2.getPath())) {
-          CompressionCodec codec = factory.getCodec(file.getPath());
-          assertTrue(codec instanceof DeflateCodec);
-        }
-      }
-    }
-  }
-
-  @Test
-  public final void testInsertOverwritePartitionByColumnWithCompression3() throws Exception {
-    String tableName = "testInsertOverwritePartitionByColumnWithCompression3";
-    ResultSet res = tpch.execute("create table " + tableName + " (col1 int4, col2 int4, col3 float8) USING csv WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') partition by column(col1, col2, col3)");
-    res.close();
-    TajoTestingCluster cluster = tpch.getTestingCluster();
-    CatalogService catalog = cluster.getMaster().getCatalog();
-    assertTrue(catalog.existsTable(tableName));
-
-    res = tpch.execute("insert overwrite into " + tableName + " select  l_orderkey, l_partkey, l_quantity from lineitem");
-    res.close();
-    TableDesc desc = catalog.getTableDesc(tableName);
-    assertEquals(5, desc.getStats().getNumRows().intValue());
-
-    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
-    assertTrue(fs.exists(desc.getPath()));
-    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
-
-    Path path = desc.getPath();
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
-    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
-
-    for (FileStatus partition1 : fs.listStatus(path)){
-      assertTrue(fs.isDirectory(partition1.getPath()));
-      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
-        assertTrue(fs.isDirectory(partition2.getPath()));
-        for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
-          assertTrue(fs.isDirectory(partition3.getPath()));
-          for (FileStatus file : fs.listStatus(partition3.getPath())) {
-            CompressionCodec codec = factory.getCodec(file.getPath());
-            assertTrue(codec instanceof DeflateCodec);
-          }
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
new file mode 100644
index 0000000..4fb5b8a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DeflateCodec;
+import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.catalog.CatalogService;
+import org.apache.tajo.catalog.TableDesc;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+public class TestTablePartitions {
+
+  private static TpchTestBase tpch;
+  public TestTablePartitions() throws IOException {
+    super();
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    tpch = TpchTestBase.getInstance();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByOneColumn() throws Exception {
+    String tableName ="testColumnPartitionedTableByOneColumn";
+    ResultSet res = tpch.execute(
+        "create table " + tableName +" (col1 int4, col2 int4, null_col int4) partition by column(key float8) ");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute("insert overwrite into " + tableName
+        + " (col1, col2, key) select l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(tableName);
+    Path path = desc.getPath();
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=36.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/key=49.0")));
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    res = tpch.execute(
+        "select distinct * from " + tableName + " where (key = 45.0 or key = 38.0) and null_col is null");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    for (int i = 0; i < 3 && res.next(); i++) {
+      resultRows1.get(res.getDouble(4))[0] = res.getInt(2);
+      resultRows1.get(res.getDouble(4))[1] = res.getInt(3);
+    }
+    res.close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByThreeColumns() throws Exception {
+    String tableName ="testColumnPartitionedTableByThreeColumns";
+    ResultSet res = tpch.execute(
+        "create table " + tableName +" (col4 text) partition by column(col1 int4, col2 int4, col3 float8) ");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute("insert overwrite into " + tableName
+        + " select l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+
+    TableDesc desc = catalog.getTableDesc(tableName);
+    Path path = desc.getPath();
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.isDirectory(path));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    res = tpch.execute("select * from " + tableName + " where col2 = 2");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    for (int i = 0; i < 3 && res.next(); i++) {
+      resultRows1.get(res.getDouble(4))[0] = res.getInt(2);
+      resultRows1.get(res.getDouble(4))[1] = res.getInt(3);
+    }
+    res.close();
+
+    Map<Double, int []> resultRows2 = Maps.newHashMap();
+    resultRows2.put(49.0d, new int[]{3, 3});
+    resultRows2.put(45.0d, new int[]{3, 2});
+    resultRows2.put(38.0d, new int[]{2, 2});
+
+    res = tpch.execute("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
+    for (int i = 0; i < 3 && res.next(); i++) {
+      resultRows2.get(res.getDouble(4))[0] = res.getInt(2);
+      resultRows2.get(res.getDouble(4))[1] = res.getInt(3);
+    }
+    assertFalse(res.next());
+    res.close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByOneColumnsWithCompression() throws Exception {
+    String tableName = "testColumnPartitionedTableByOneColumnsWithCompression";
+    ResultSet res = tpch.execute(
+        "create table " + tableName + " (col2 int4, col3 float8) USING csv " +
+            "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+            "PARTITION BY column(col1 int4)");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute(
+        "insert overwrite into " + tableName + " select  l_partkey, l_quantity, l_orderkey from lineitem");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.exists(desc.getPath()));
+    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+    Path path = desc.getPath();
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+
+    for (FileStatus partition : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition.getPath()));
+      for (FileStatus file : fs.listStatus(partition.getPath())) {
+        CompressionCodec codec = factory.getCodec(file.getPath());
+        assertTrue(codec instanceof DeflateCodec);
+      }
+    }
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByTwoColumnsWithCompression() throws Exception {
+    String tableName = "testColumnPartitionedTableByTwoColumnsWithCompression";
+    ResultSet res = tpch.execute("create table " + tableName + " (col3 float8, col4 text) USING csv " +
+        "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+        "PARTITION by column(col1 int4, col2 int4)");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute(
+        "insert overwrite into " + tableName +
+            " select  l_quantity, l_returnflag, l_orderkey, l_partkey from lineitem");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.exists(desc.getPath()));
+    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+    Path path = desc.getPath();
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+
+    for (FileStatus partition1 : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition1.getPath()));
+      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+        assertTrue(fs.isDirectory(partition2.getPath()));
+        for (FileStatus file : fs.listStatus(partition2.getPath())) {
+          CompressionCodec codec = factory.getCodec(file.getPath());
+          assertTrue(codec instanceof DeflateCodec);
+        }
+      }
+    }
+  }
+
+  @Test
+  public final void testColumnPartitionedTableByThreeColumnsWithCompression() throws Exception {
+    String tableName = "testColumnPartitionedTableNoMatchedPartition";
+    ResultSet res = tpch.execute(
+        "create table " + tableName + " (col4 text) USING csv " +
+            "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+            "partition by column(col1 int4, col2 int4, col3 float8)");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute(
+        "insert overwrite into " + tableName +
+            " select  l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.exists(desc.getPath()));
+    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+    Path path = desc.getPath();
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+
+    for (FileStatus partition1 : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition1.getPath()));
+      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+        assertTrue(fs.isDirectory(partition2.getPath()));
+        for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
+          assertTrue(fs.isDirectory(partition3.getPath()));
+          for (FileStatus file : fs.listStatus(partition3.getPath())) {
+            CompressionCodec codec = factory.getCodec(file.getPath());
+            assertTrue(codec instanceof DeflateCodec);
+          }
+        }
+      }
+    }
+
+    res = tpch.execute("select * from " + tableName + " where col2 = 2");
+
+    Map<Double, int []> resultRows1 = Maps.newHashMap();
+    resultRows1.put(45.0d, new int[]{3, 2});
+    resultRows1.put(38.0d, new int[]{2, 2});
+
+    for (int i = 0; i < 3 && res.next(); i++) {
+      resultRows1.get(res.getDouble(4))[0] = res.getInt(2);
+      resultRows1.get(res.getDouble(4))[1] = res.getInt(3);
+    }
+    res.close();
+
+    Map<Double, int []> resultRows2 = Maps.newHashMap();
+    resultRows2.put(49.0d, new int[]{3, 3});
+    resultRows2.put(45.0d, new int[]{3, 2});
+    resultRows2.put(38.0d, new int[]{2, 2});
+
+    res = tpch.execute("select * from " + tableName + " where (col1 = 2 or col1 = 3) and col2 >= 2");
+    for (int i = 0; i < 3 && res.next(); i++) {
+      resultRows2.get(res.getDouble(4))[0] = res.getInt(2);
+      resultRows2.get(res.getDouble(4))[1] = res.getInt(3);
+    }
+    assertFalse(res.next());
+    res.close();
+  }
+
+  @Test
+  public final void testColumnPartitionedTableNoMatchedPartition() throws Exception {
+    String tableName = "testColumnPartitionedTableNoMatchedPartition";
+    ResultSet res = tpch.execute(
+        "create table " + tableName + " (col4 text) USING csv " +
+            "WITH ('csvfile.delimiter'='|','compression.codec'='org.apache.hadoop.io.compress.DeflateCodec') " +
+            "partition by column(col1 int4, col2 int4, col3 float8)");
+    res.close();
+    TajoTestingCluster cluster = tpch.getTestingCluster();
+    CatalogService catalog = cluster.getMaster().getCatalog();
+    assertTrue(catalog.existsTable(tableName));
+
+    res = tpch.execute(
+        "insert overwrite into " + tableName +
+            " select  l_returnflag, l_orderkey, l_partkey, l_quantity from lineitem");
+    res.close();
+    TableDesc desc = catalog.getTableDesc(tableName);
+    assertEquals(5, desc.getStats().getNumRows().intValue());
+
+    FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration());
+    assertTrue(fs.exists(desc.getPath()));
+    CompressionCodecFactory factory = new CompressionCodecFactory(tpch.getTestingCluster().getConfiguration());
+
+    Path path = desc.getPath();
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0")));
+    assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0")));
+
+    for (FileStatus partition1 : fs.listStatus(path)){
+      assertTrue(fs.isDirectory(partition1.getPath()));
+      for (FileStatus partition2 : fs.listStatus(partition1.getPath())) {
+        assertTrue(fs.isDirectory(partition2.getPath()));
+        for (FileStatus partition3 : fs.listStatus(partition2.getPath())) {
+          assertTrue(fs.isDirectory(partition3.getPath()));
+          for (FileStatus file : fs.listStatus(partition3.getPath())) {
+            CompressionCodec codec = factory.getCodec(file.getPath());
+            assertTrue(codec instanceof DeflateCodec);
+          }
+        }
+      }
+    }
+
+    res = tpch.execute("select * from " + tableName + " where col2 = 9");
+    assertFalse(res.next());
+    res.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
index 6b3d90f..3c4d3c2 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/util/TestTupleUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.util;
 
+import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -33,8 +34,7 @@ import org.apache.tajo.worker.dataserver.HttpUtil;
 import java.io.UnsupportedEncodingException;
 import java.util.Map;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 public class TestTupleUtil {
   @Test
@@ -228,4 +228,52 @@ public class TestTupleUtil {
       assertEquals(range, result);
     }
   }
+
+  @Test
+  public void testBuildTupleFromPartitionPath() {
+
+    Schema schema = new Schema();
+    schema.addColumn("key1", Type.INT8);
+    schema.addColumn("key2", Type.TEXT);
+
+    Path path = new Path("hdfs://tajo/warehouse/partition_test/");
+    Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    assertNull(tuple);
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    assertNull(tuple);
+
+    path = new Path("hdfs://tajo/warehouse/partition_test/key1=123");
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    assertNotNull(tuple);
+    assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    assertNotNull(tuple);
+    assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+
+    path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/part-0000"); // wrong cases;
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    assertNull(tuple);
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    assertNull(tuple);
+
+    path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc");
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    assertNotNull(tuple);
+    assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+    assertEquals(DatumFactory.createText("abc"), tuple.get(1));
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    assertNotNull(tuple);
+    assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+    assertEquals(DatumFactory.createText("abc"), tuple.get(1));
+
+
+    path = new Path("hdfs://tajo/warehouse/partition_test/key1=123/key2=abc/part-0001");
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+    assertNull(tuple);
+
+    tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, false);
+    assertNotNull(tuple);
+    assertEquals(DatumFactory.createInt8(123), tuple.get(0));
+    assertEquals(DatumFactory.createText("abc"), tuple.get(1));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
index 5b6b5a3..4d78a27 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java
@@ -74,7 +74,7 @@ public class TestExecutionBlockCursor {
 
     analyzer = new SQLAnalyzer();
     logicalPlanner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
 
     AbstractStorageManager sm  = StorageManagerFactory.getStorageManager(conf);
     dispatcher = new AsyncDispatcher();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index ca1247a..0396f33 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -82,7 +82,7 @@ public class TestGlobalPlanner {
 
     sqlAnalyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(util.getConfiguration());
     globalPlanner = new GlobalPlanner(util.getConfiguration(),
         StorageManagerFactory.getStorageManager(util.getConfiguration()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 6934872..43ea5f8 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -79,7 +79,7 @@ public class TestRangeRetrieverHandler {
 
     analyzer = new SQLAnalyzer();
     planner = new LogicalPlanner(catalog);
-    optimizer = new LogicalOptimizer();
+    optimizer = new LogicalOptimizer(conf);
 
     schema = new Schema();
     schema.addColumn("empId", Type.INT4);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql b/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
index 65493a2..397e7ac 100644
--- a/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
+++ b/tajo-core/tajo-core-backend/src/test/queries/create_table_partition_by_column.sql
@@ -1,4 +1,4 @@
 CREATE TABLE sales ( col1 int, col2 int)
-PARTITION BY COLUMN (col1, col2, col3);
+PARTITION BY COLUMN (col3 int, col4 float, col5 text);
 
 


[2/3] TAJO-338 - Add Query Optimization Part for Column-Partitioned Tables. (hyunsik)

Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index 638c92a..d262bc3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.engine.eval;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -31,8 +30,8 @@ import org.apache.tajo.exception.InternalException;
 import java.util.*;
 
 public class EvalTreeUtil {
-  public static void changeColumnRef(EvalNode node, String oldName,
-      String newName) {
+
+  public static void changeColumnRef(EvalNode node, String oldName, String newName) {
     node.postOrder(new ChangeColumnRefVisitor(oldName, newName));
   }
 
@@ -80,50 +79,9 @@ public class EvalTreeUtil {
     node.postOrder(finder);
     return finder.getColumnRefs();
   }
-  
-  /**
-   * Convert a list of conjunctive normal forms into a singleton expression.
-   *  
-   * @param cnfExprs
-   * @return The EvalNode object that merges all CNF-formed expressions.
-   */
-  public static EvalNode transformCNF2Singleton(EvalNode...cnfExprs) {
-    if (cnfExprs.length == 1) {
-      return cnfExprs[0];
-    }
-    
-    return transformCNF2Singleton_(cnfExprs, 0);
-  }
-  
-  private static EvalNode transformCNF2Singleton_(EvalNode [] evalNode, int idx) {
-    if (idx == evalNode.length - 2) {
-      return new BinaryEval(EvalType.AND, evalNode[idx], evalNode[idx + 1]);
-    } else {
-      return new BinaryEval(EvalType.AND, evalNode[idx],
-          transformCNF2Singleton_(evalNode, idx + 1));
-    }
-  }
-  
-  /**
-   * Get a list of exprs similar to CNF
-   * 
-   * @param expr The expression to be transformed to an array of CNF-formed expressions.
-   * @return An array of CNF-formed expressions
-   */
-  public static EvalNode [] getConjNormalForm(EvalNode expr) {
-    List<EvalNode> list = new ArrayList<EvalNode>();    
-    getConjNormalForm(expr, list);
-    return list.toArray(new EvalNode[list.size()]);
-  }
-  
-  private static void getConjNormalForm(EvalNode node, List<EvalNode> found) {
-    if (node.getType() == EvalType.AND) {
-      getConjNormalForm(node.getLeftExpr(), found);
-      getConjNormalForm(node.getRightExpr(), found);
-    } else {
-      found.add(node);
-    }
-  }
+
+
+
   
   public static Schema getSchemaByTargets(Schema inputSchema, Target [] targets) 
       throws InternalException {
@@ -234,16 +192,8 @@ public class EvalTreeUtil {
     }    
   }
 
-  public static boolean isComparisonOperator(EvalNode expr) {
-    return expr.getType() == EvalType.EQUAL ||
-        expr.getType() == EvalType.LEQ ||
-        expr.getType() == EvalType.LTH ||
-        expr.getType() == EvalType.GEQ ||
-        expr.getType() == EvalType.GTH;
-  }
-
   public static boolean isJoinQual(EvalNode expr) {
-    return isComparisonOperator(expr) &&
+    return AlgebraicUtil.isComparisonOperator(expr) &&
         expr.getLeftExpr().getType() == EvalType.FIELD &&
         expr.getRightExpr().getType() == EvalType.FIELD;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
index e2411e3..59e8b31 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
@@ -71,7 +71,7 @@ public class InEval extends BinaryEval {
 
     Datum value = tuple.get(fieldId);
     for (Datum datum : values) {
-      if (value.equals(datum)) {
+      if (value.equalsTo(datum).asBool()) {
         isIncluded = true;
         break;
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
index ac7aeeb..e1c693e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
@@ -42,6 +42,10 @@ public class LikePredicateEval extends PatternMatchPredicateEval {
     this.compiled = Pattern.compile(regex, flags);
   }
 
+  public boolean isLeadingWildCard() {
+    return pattern.indexOf(".*") == 0;
+  }
+
   @Override
   public String toString() {
     return leftExpr.toString() + (caseInsensitive ? "ILIKE" : "LIKE") + "'" + pattern +"'";

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 2cd91f9..ad8acf1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -906,9 +906,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     }
 
     if (checkIfExist(ctx.table_partitioning_clauses())) {
-      CreateTable.PartitionOption partitionOption =
+      PartitionDescExpr partitionDesc =
           parseTablePartitioningClause(ctx.table_partitioning_clauses());
-      createTable.setPartition(partitionOption);
+      createTable.setPartition(partitionDesc);
     }
     return createTable;
   }
@@ -925,7 +925,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     return elements;
   }
 
-  public CreateTable.PartitionOption parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
+  public PartitionDescExpr parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
 
     if (checkIfExist(ctx.range_partitions())) { // For Range Partition
       Range_partitionsContext rangePartitionsContext = ctx.range_partitions();
@@ -978,9 +978,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       return new ListPartition(getColumnReferences(ctx.list_partitions().column_reference_list()), specifiers);
 
     } else if (checkIfExist(ctx.column_partitions())) { // For Column Partition (Hive Style)
-      return new CreateTable.ColumnPartition(getColumnReferences(ctx.column_partitions().column_reference_list()));
+      return new CreateTable.ColumnPartition(getDefinitions(ctx.column_partitions().table_elements()), true);
     } else {
-      throw new SQLSyntaxError("Wrong partition option");
+      throw new SQLSyntaxError("Invalid Partition Type: " + ctx.toStringTree());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
index 9478648..0cee8dd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -92,6 +92,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
       case SCAN:
         current = visitScan(context, plan, (ScanNode) node, stack);
         break;
+      case PARTITIONS_SCAN:
+        current = visitScan(context, plan, (ScanNode) node, stack);
+        break;
       case STORE:
         current = visitStoreTable(context, plan, (StoreTableNode) node, stack);
         break;
@@ -221,6 +224,11 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
   }
 
   @Override
+  public RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, PartitionedTableScanNode node, Stack<LogicalNode> stack) throws PlanningException {
+    return null;
+  }
+
+  @Override
   public RESULT visitStoreTable(CONTEXT context, LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack)
       throws PlanningException {
     stack.push(node);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
index 640383e..5f11f1a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -22,8 +22,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
 import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.graph.DirectedGraphCursor;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.logical.join.FoundJoinOrder;
@@ -32,6 +33,7 @@ import org.apache.tajo.engine.planner.logical.join.JoinGraph;
 import org.apache.tajo.engine.planner.logical.join.JoinOrderAlgorithm;
 import org.apache.tajo.engine.planner.rewrite.BasicQueryRewriteEngine;
 import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule;
+import org.apache.tajo.engine.planner.rewrite.PartitionedTableRewriter;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 
 import java.util.Set;
@@ -49,12 +51,13 @@ public class LogicalOptimizer {
   private BasicQueryRewriteEngine rulesAfterToJoinOpt;
   private JoinOrderAlgorithm joinOrderAlgorithm = new GreedyHeuristicJoinOrderAlgorithm();
 
-  public LogicalOptimizer() {
+  public LogicalOptimizer(TajoConf systemConf) {
     rulesBeforeJoinOpt = new BasicQueryRewriteEngine();
     rulesBeforeJoinOpt.addRewriteRule(new FilterPushDownRule());
 
     rulesAfterToJoinOpt = new BasicQueryRewriteEngine();
     rulesAfterToJoinOpt.addRewriteRule(new ProjectionPushDownRule());
+    rulesAfterToJoinOpt.addRewriteRule(new PartitionedTableRewriter(systemConf));
   }
 
   public LogicalNode optimize(LogicalPlan plan) throws PlanningException {
@@ -130,7 +133,7 @@ public class LogicalOptimizer {
     public LogicalNode visitFilter(JoinGraphContext context, LogicalPlan plan, SelectionNode node,
                                    Stack<LogicalNode> stack) throws PlanningException {
       super.visitFilter(context, plan, node, stack);
-      context.quals.addAll(Lists.newArrayList(EvalTreeUtil.getConjNormalForm(node.getQual())));
+      context.quals.addAll(Lists.newArrayList(AlgebraicUtil.toConjunctiveNormalFormArray(node.getQual())));
       return node;
     }
 
@@ -239,7 +242,7 @@ public class LogicalOptimizer {
 
       double filterFactor = 1;
       if (joinNode.hasJoinQual()) {
-        EvalNode [] quals = EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual());
+        EvalNode [] quals = AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual());
         filterFactor = Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, quals.length);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
index c06b7a7..c6fcefc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -47,6 +47,8 @@ public interface LogicalPlanVisitor <CONTEXT, RESULT> {
       throws PlanningException;
   RESULT visitScan(CONTEXT context, LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack)
       throws PlanningException;
+  RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, PartitionedTableScanNode node,
+                                   Stack<LogicalNode> stack) throws PlanningException;
   RESULT visitStoreTable(CONTEXT context, LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack)
       throws PlanningException;
   RESULT visitInsert(CONTEXT context, LogicalPlan plan, InsertNode node, Stack<LogicalNode> stack)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 63b7985..605b9df 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.algebra.CreateTable.ColumnDefinition;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
@@ -50,10 +50,13 @@ import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.util.TUtil;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Stack;
 
 import static org.apache.tajo.algebra.Aggregation.GroupType;
+import static org.apache.tajo.algebra.CreateTable.ColumnPartition;
+import static org.apache.tajo.algebra.CreateTable.PartitionType;
 import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
 
@@ -407,7 +410,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     // 3. build this plan:
     EvalNode searchCondition = createEvalTree(plan, block, selection.getQual());
-    SelectionNode selectionNode = new SelectionNode(plan.newPID(), searchCondition);
+    EvalNode simplified = AlgebraicUtil.eliminateConstantExprs(searchCondition);
+    SelectionNode selectionNode = new SelectionNode(plan.newPID(), simplified);
 
     // 4. set child plan, update input/output schemas:
     selectionNode.setChild(child);
@@ -772,8 +776,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
       return storeNode;
     } else {
-      CreateTableNode createTableNode = new CreateTableNode(context.plan.newPID(), expr.getTableName(),
-          convertTableElementsSchema(expr.getTableElements()));
+
+      Schema tableSchema;
+      if (expr.hasPartition() && expr.getPartition().getPartitionType() == PartitionType.COLUMN &&
+          ((ColumnPartition)expr.getPartition()).isOmitValues()) {
+        ColumnDefinition [] merged = TUtil.concat(expr.getTableElements(),
+            ((ColumnPartition)expr.getPartition()).getColumns());
+        tableSchema = convertTableElementsSchema(merged);
+      } else {
+        tableSchema = convertTableElementsSchema(expr.getTableElements());
+      }
+
+      CreateTableNode createTableNode = new CreateTableNode(
+          context.plan.newPID(),
+          expr.getTableName(),
+          tableSchema);
 
       if (expr.isExternal()) {
         createTableNode.setExternal(true);
@@ -811,28 +828,28 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    * @return
    * @throws PlanningException
    */
-  private Partitions convertTableElementsPartition(PlanContext context,
+  private PartitionDesc convertTableElementsPartition(PlanContext context,
                                                    CreateTable expr) throws PlanningException {
     Schema schema = convertTableElementsSchema(expr.getTableElements());
-    Partitions partitions = null;
+    PartitionDesc partitionDesc = null;
     List<Specifier> specifiers = null;
     if (expr.hasPartition()) {
-      partitions = new Partitions();
+      partitionDesc = new PartitionDesc();
       specifiers = TUtil.newList();
 
-      partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
+      partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
           .getPartitionType().name()));
 
-      if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.HASH)) {
+      if (expr.getPartition().getPartitionType().equals(PartitionType.HASH)) {
         CreateTable.HashPartition hashPartition = expr.getPartition();
 
-        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+        partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
             , hashPartition.getColumns()));
 
         if (hashPartition.getColumns() != null) {
           if (hashPartition.getQuantifier() != null) {
             String quantity = ((LiteralValue)hashPartition.getQuantifier()).getValue();
-            partitions.setNumPartitions(Integer.parseInt(quantity));
+            partitionDesc.setNumPartitions(Integer.parseInt(quantity));
           }
 
           if (hashPartition.getSpecifiers() != null) {
@@ -841,21 +858,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
             }
           }
 
-          if (specifiers.isEmpty() && partitions.getNumPartitions() > 0) {
-            for (int i = 0; i < partitions.getNumPartitions(); i++) {
-              String partitionName = partitions.getPartitionsType().name() + "_" + expr
+          if (specifiers.isEmpty() && partitionDesc.getNumPartitions() > 0) {
+            for (int i = 0; i < partitionDesc.getNumPartitions(); i++) {
+              String partitionName = partitionDesc.getPartitionsType().name() + "_" + expr
                   .getTableName() + "_" + i;
               specifiers.add(new Specifier(partitionName));
             }
           }
 
           if (!specifiers.isEmpty())
-            partitions.setSpecifiers(specifiers);
+            partitionDesc.setSpecifiers(specifiers);
         }
-      } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.LIST)) {
+      } else if (expr.getPartition().getPartitionType().equals(PartitionType.LIST)) {
         CreateTable.ListPartition listPartition = expr.getPartition();
 
-        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+        partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
             , listPartition.getColumns()));
 
         if (listPartition.getSpecifiers() != null) {
@@ -876,12 +893,12 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
             specifiers.add(specifier);
           }
           if (!specifiers.isEmpty())
-            partitions.setSpecifiers(specifiers);
+            partitionDesc.setSpecifiers(specifiers);
         }
-      } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.RANGE)) {
+      } else if (expr.getPartition().getPartitionType().equals(PartitionType.RANGE)) {
         CreateTable.RangePartition rangePartition = expr.getPartition();
 
-        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+        partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
             , rangePartition.getColumns()));
 
         if (rangePartition.getSpecifiers() != null) {
@@ -903,17 +920,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
             specifiers.add(specifier);
           }
           if (!specifiers.isEmpty())
-            partitions.setSpecifiers(specifiers);
+            partitionDesc.setSpecifiers(specifiers);
         }
-      } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.COLUMN)) {
-        CreateTable.ColumnPartition columnPartition = expr.getPartition();
-
-        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
-            , columnPartition.getColumns()));
+      } else if (expr.getPartition().getPartitionType() == PartitionType.COLUMN) {
+        ColumnPartition columnPartition = expr.getPartition();
+        partitionDesc.setColumns(convertTableElementsSchema(columnPartition.getColumns()).getColumns());
+        partitionDesc.setOmitValues(columnPartition.isOmitValues());
       }
     }
 
-    return partitions;
+    return partitionDesc;
   }
 
 
@@ -933,7 +949,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return schema;
   }
 
-  private List<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
+  private Collection<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
                                                    ColumnReferenceExpr[] references) {
     List<Column> columnList = TUtil.newList();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index bbecd50..73395a6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -138,7 +138,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
         return leftExec;
 
-      } case SCAN:
+      }
+      case PARTITIONS_SCAN:
+      case SCAN:
         leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
         return leftExec;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 6b23ed8..d541680 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -545,7 +545,7 @@ public class PlannerUtil {
    * @return true if two operands refers to columns and the operator is comparison,
    */
   public static boolean isJoinQual(EvalNode qual) {
-    if (EvalTreeUtil.isComparisonOperator(qual)) {
+    if (AlgebraicUtil.isComparisonOperator(qual)) {
       List<Column> left = EvalTreeUtil.findAllColumnRefs(qual.getLeftExpr());
       List<Column> right = EvalTreeUtil.findAllColumnRefs(qual.getRightExpr());
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
index a49451c..ac50c46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
@@ -48,6 +48,10 @@ public class Target implements Cloneable, GsonObject {
     return !hasAlias() ? column.getQualifiedName() : alias;
   }
 
+  public final void setExpr(EvalNode expr) {
+    this.expr = expr;
+  }
+
   public final void setAlias(String alias) {
     this.alias = alias;
     this.column = new Column(alias, expr.getValueType());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 4f936fc..4f3976e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -53,6 +53,10 @@ public class ExecutionBlock {
     this.scanlist.clear();
     this.plan = plan;
 
+    if (plan == null) {
+      return;
+    }
+
     LogicalNode node = plan;
     ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
     s.add(node);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 6ef35ce..abf5620 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -112,7 +112,7 @@ public class GlobalPlanner {
   }
 
   private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
-                                          ExecutionBlock leftBlock, ExecutionBlock rightBlock)
+                                       ExecutionBlock leftBlock, ExecutionBlock rightBlock)
       throws PlanningException {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index 942309d..d0c8373 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Options;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
@@ -36,7 +36,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
   @Expose private Path path;
   @Expose private Options options;
   @Expose private boolean external;
-  @Expose private Partitions partitions;
+  @Expose private PartitionDesc partitionDesc;
 
   public CreateTableNode(int pid, String tableName, Schema schema) {
     super(pid, NodeType.CREATE_TABLE);
@@ -92,16 +92,16 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     this.external = external;
   }
 
-  public Partitions getPartitions() {
-    return partitions;
+  public PartitionDesc getPartitions() {
+    return partitionDesc;
   }
 
-  public void setPartitions(Partitions partitions) {
-    this.partitions = partitions;
+  public void setPartitions(PartitionDesc partitionDesc) {
+    this.partitionDesc = partitionDesc;
   }
 
   public boolean hasPartition() {
-    return this.partitions != null;
+    return this.partitionDesc != null;
   }
 
   @Override
@@ -121,7 +121,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
           && TUtil.checkEquals(path, other.path)
           && TUtil.checkEquals(options, other.options)
           && TUtil.checkEquals(partitionKeys, other.partitionKeys)
-          && TUtil.checkEquals(partitions, other.partitions);
+          && TUtil.checkEquals(partitionDesc, other.partitionDesc);
     } else {
       return false;
     }
@@ -137,7 +137,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     store.path = path != null ? new Path(path.toString()) : null;
     store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
     store.options = (Options) (options != null ? options.clone() : null);
-    store.partitions = (Partitions) (partitions != null ? partitions.clone() : null);
+    store.partitionDesc = (PartitionDesc) (partitionDesc != null ? partitionDesc.clone() : null);
     return store;
   }
   
@@ -157,7 +157,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     sb.append(",\"storeType\": \"" + this.storageType);
     sb.append(",\"path\" : \"" + this.path).append("\",");
     sb.append(",\"external\" : \"" + this.external).append("\",");
-    sb.append(",\"partitions\" : \"" + this.partitions).append("\",");
+    sb.append(",\"partitions\" : \"" + this.partitionDesc).append("\",");
     
     sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
     .append("\n  \"in schema\": ").append(getInSchema())

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index 44790ec..eaaf0c7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -37,6 +37,7 @@ public enum NodeType {
   INTERSECT(IntersectNode.class),
   LIMIT(LimitNode.class),
   JOIN(JoinNode.class),
+  PARTITIONS_SCAN(PartitionedTableScanNode.class),
   PROJECTION(ProjectionNode.class),
   ROOT(LogicalRootNode.class),
   SCAN(ScanNode.class),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
new file mode 100644
index 0000000..21f5ef4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.util.TUtil;
+
+public class PartitionedTableScanNode extends ScanNode {
+  @Expose Path [] inputPaths;
+
+  public PartitionedTableScanNode(int pid, ScanNode scanNode, Path[] inputPaths) {
+    super(pid, NodeType.PARTITIONS_SCAN, scanNode.getTableDesc());
+    this.setInSchema(scanNode.getInSchema());
+    this.setOutSchema(scanNode.getOutSchema());
+    this.alias = scanNode.alias;
+    this.renamedSchema = scanNode.renamedSchema;
+    this.qual = scanNode.qual;
+    this.targets = scanNode.targets;
+    this.inputPaths = inputPaths;
+  }
+
+  public void setInputPaths(Path [] paths) {
+    this.inputPaths = paths;
+  }
+
+  public Path [] getInputPaths() {
+    return inputPaths;
+  }
+	
+	public String toString() {
+	  StringBuilder sb = new StringBuilder();	  
+	  sb.append("\"Partitions Scan\" : {\"table\":\"")
+	  .append(getTableName()).append("\"");
+	  if (hasAlias()) {
+	    sb.append(",\"alias\": \"").append(alias);
+	  }
+	  
+	  if (hasQual()) {
+	    sb.append(", \"qual\": \"").append(this.qual).append("\"");
+	  }
+	  
+	  if (hasTargets()) {
+	    sb.append(", \"target list\": ");
+      boolean first = true;
+      for (Target target : targets) {
+        if (!first) {
+          sb.append(", ");
+        }
+        sb.append(target);
+        first = false;
+      }
+	  }
+
+    if (inputPaths != null) {
+      sb.append(", \"Partition paths\": ");
+      for (Path path : inputPaths) {
+        sb.append("\n ");
+        sb.append(path);
+      }
+      sb.append("\n");
+    }
+
+	  sb.append(",");
+	  sb.append("\n  \"out schema\": ").append(getOutSchema());
+	  sb.append("\n  \"in schema\": ").append(getInSchema());
+	  return sb.toString();
+	}
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(this.tableDesc, this.qual, this.targets);
+  }
+	
+	@Override
+	public boolean equals(Object obj) {
+	  if (obj instanceof PartitionedTableScanNode) {
+	    PartitionedTableScanNode other = (PartitionedTableScanNode) obj;
+	    
+	    boolean eq = super.equals(other); 
+	    eq = eq && TUtil.checkEquals(this.tableDesc, other.tableDesc);
+	    eq = eq && TUtil.checkEquals(this.qual, other.qual);
+	    eq = eq && TUtil.checkEquals(this.targets, other.targets);
+      eq = eq && TUtil.checkEquals(this.inputPaths, other.inputPaths);
+	    
+	    return eq;
+	  }	  
+	  
+	  return false;
+	}	
+	
+	@Override
+	public Object clone() throws CloneNotSupportedException {
+	  PartitionedTableScanNode unionScan = (PartitionedTableScanNode) super.clone();
+	  
+	  unionScan.tableDesc = (TableDesc) this.tableDesc.clone();
+	  
+	  if (hasQual()) {
+	    unionScan.qual = (EvalNode) this.qual.clone();
+	  }
+	  
+	  if (hasTargets()) {
+	    unionScan.targets = new Target[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        unionScan.targets[i] = (Target) targets[i].clone();
+      }
+	  }
+
+    unionScan.inputPaths = inputPaths;
+
+    return unionScan;
+	}
+	
+  @Override
+  public void preOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+	
+	public void postOrder(LogicalNodeVisitor visitor) {        
+    visitor.visit(this);
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Scan on ").appendTitle(getTableName());
+    if (hasAlias()) {
+      planStr.appendTitle(" as ").appendTitle(alias);
+    }
+
+    if (hasQual()) {
+      planStr.addExplan("filter: ").appendExplain(this.qual.toString());
+    }
+
+    if (hasTargets()) {
+      planStr.addExplan("target list: ");
+      boolean first = true;
+      for (Target target : targets) {
+        if (!first) {
+          planStr.appendExplain(", ");
+        }
+        planStr.appendExplain(target.toString());
+        first = false;
+      }
+    }
+
+    if (inputPaths != null) {
+      planStr.addExplan("Path list: ");
+      int i = 0;
+      for (Path path : inputPaths) {
+        planStr.addExplan((i++) + ": ").appendExplain(path.toString());
+      }
+    }
+
+    planStr.addDetail("out schema: ").appendDetail(getOutSchema().toString());
+    planStr.addDetail("in schema: ").appendDetail(getInSchema().toString());
+
+    return planStr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
index 6a30aa6..5ea61b4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
@@ -24,7 +24,7 @@ public abstract class RelationNode extends LogicalNode {
 
   public RelationNode(int pid, NodeType nodeType) {
     super(pid, nodeType);
-    assert(nodeType == NodeType.SCAN || nodeType == NodeType.TABLE_SUBQUERY);
+    assert(nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.TABLE_SUBQUERY);
   }
 
   public abstract String getTableName();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 04c7b5a..cd9c1f1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -29,11 +29,16 @@ import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.util.TUtil;
 
 public class ScanNode extends RelationNode implements Projectable {
-	@Expose private TableDesc tableDesc;
-  @Expose private String alias;
-  @Expose private Schema renamedSchema;
-	@Expose private EvalNode qual;
-	@Expose private Target[] targets;
+	@Expose protected TableDesc tableDesc;
+  @Expose protected String alias;
+  @Expose protected Schema renamedSchema;
+	@Expose protected EvalNode qual;
+	@Expose protected Target[] targets;
+
+  protected ScanNode(int pid, NodeType nodeType, TableDesc desc) {
+    super(pid, nodeType);
+    this.tableDesc = desc;
+  }
 
   public ScanNode(int pid, TableDesc desc) {
     super(pid, NodeType.SCAN);
@@ -46,7 +51,7 @@ public class ScanNode extends RelationNode implements Projectable {
     this(pid, desc);
     this.alias = PlannerUtil.normalizeTableName(alias);
     renamedSchema = getOutSchema();
-    renamedSchema.setQualifier(this.alias, true);
+    renamedSchema.setQualifier(this.alias);
 	}
 	
 	public String getTableName() {
@@ -119,7 +124,7 @@ public class ScanNode extends RelationNode implements Projectable {
         first = false;
       }
 	  }
-	  
+
 	  sb.append(",");
 	  sb.append("\n  \"out schema\": ").append(getOutSchema());
 	  sb.append("\n  \"in schema\": ").append(getInSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index b2bd937..634fa3a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
@@ -39,17 +39,17 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
   @Expose private Options options;
   @Expose private boolean isCreatedTable = false;
   @Expose private boolean isOverwritten = false;
-  @Expose private Partitions partitions;
+  @Expose private PartitionDesc partitionDesc;
 
   public StoreTableNode(int pid, String tableName) {
     super(pid, NodeType.STORE);
     this.tableName = tableName;
   }
 
-  public StoreTableNode(int pid, String tableName, Partitions partitions) {
+  public StoreTableNode(int pid, String tableName, PartitionDesc partitionDesc) {
     super(pid, NodeType.STORE);
     this.tableName = tableName;
-    this.partitions = partitions;
+    this.partitionDesc = partitionDesc;
   }
 
   public final String getTableName() {
@@ -109,12 +109,12 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     return this.options;
   }
 
-  public Partitions getPartitions() {
-    return partitions;
+  public PartitionDesc getPartitions() {
+    return partitionDesc;
   }
 
-  public void setPartitions(Partitions partitions) {
-    this.partitions = partitions;
+  public void setPartitions(PartitionDesc partitionDesc) {
+    this.partitionDesc = partitionDesc;
   }
 
   @Override
@@ -146,7 +146,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
       eq = eq && TUtil.checkEquals(options, other.options);
       eq = eq && isCreatedTable == other.isCreatedTable;
       eq = eq && isOverwritten == other.isOverwritten;
-      eq = eq && TUtil.checkEquals(partitions, other.partitions);
+      eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
       return eq;
     } else {
       return false;
@@ -163,7 +163,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     store.options = options != null ? (Options) options.clone() : null;
     store.isCreatedTable = isCreatedTable;
     store.isOverwritten = isOverwritten;
-    store.partitions = partitions;
+    store.partitionDesc = partitionDesc;
     return store;
   }
 
@@ -188,8 +188,8 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
     .append("\n  \"in schema\": ").append(getInSchema());
 
-    if(partitions != null) {
-      sb.append(partitions.toString());
+    if(partitionDesc != null) {
+      sb.append(partitionDesc.toString());
     }
 
     sb.append("}");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index d1f0986..335d12f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -35,7 +35,7 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
     this.tableName = PlannerUtil.normalizeTableName(tableName);
     this.subQuery = subQuery;
     setOutSchema((Schema) this.subQuery.getOutSchema().clone());
-    getOutSchema().setQualifier(this.tableName, true);
+    getOutSchema().setQualifier(this.tableName);
     setInSchema((Schema) this.subQuery.getInSchema().clone());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
index bd1b8d3..cbdad1a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.planner.logical.join;
 
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.PlanningException;
@@ -90,7 +90,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
     joinNode.setInSchema(mergedSchema);
     joinNode.setOutSchema(mergedSchema);
     if (joinEdge.hasJoinQual()) {
-      joinNode.setJoinQual(EvalTreeUtil.transformCNF2Singleton(joinEdge.getJoinQual()));
+      joinNode.setJoinQual(AlgebraicUtil.createSingletonExprFromCNF(joinEdge.getJoinQual()));
     }
     return joinNode;
   }
@@ -206,7 +206,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
       double filterFactor = 1;
       if (joinNode.hasJoinQual()) {
         filterFactor = Math.pow(DEFAULT_SELECTION_FACTOR,
-            EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual()).length);
+            AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()).length);
         return getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()) * filterFactor;
       } else {
         return Math.pow(getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()), 2);
@@ -215,7 +215,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
     case SELECTION:
       SelectionNode selectionNode = (SelectionNode) node;
       return getCost(selectionNode.getChild()) *
-          Math.pow(DEFAULT_SELECTION_FACTOR, EvalTreeUtil.getConjNormalForm(selectionNode.getQual()).length);
+          Math.pow(DEFAULT_SELECTION_FACTOR, AlgebraicUtil.toConjunctiveNormalFormArray(selectionNode.getQual()).length);
 
     case TABLE_SUBQUERY:
       TableSubQueryNode subQueryNode = (TableSubQueryNode) node;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
index 66c82f3..74ef38a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.logical.join;
 
 import com.google.common.collect.Sets;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.LogicalPlan;
@@ -35,7 +36,7 @@ import java.util.Set;
 public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
   public Collection<EvalNode> addJoin(LogicalPlan plan, LogicalPlan.QueryBlock block,
                                       JoinNode joinNode) throws PlanningException {
-    Set<EvalNode> cnf = Sets.newHashSet(EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual()));
+    Set<EvalNode> cnf = Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()));
     Set<EvalNode> nonJoinQuals = Sets.newHashSet();
     for (EvalNode singleQual : cnf) {
       if (PlannerUtil.isJoinQual(singleQual)) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
index f7db4bd..db7e566 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
@@ -45,6 +47,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
+
 /**
  * This class is a physical operator to store at column partitioned table.
  */
@@ -56,15 +60,11 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
   private Tuple tuple;
   private Path storeTablePath;
   private final Map<String, Appender> appenderMap = new HashMap<String, Appender>();
-  private int[] columnIndexes;
-  private String[] columnNames;
-
+  private int[] partitionColumnIndices;
+  private String[] partitionColumnNames;
 
-  /**
-   * @throws java.io.IOException
-   *
-   */
-  public ColumnPartitionedTableStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException {
+  public ColumnPartitionedTableStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child)
+      throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
 
@@ -75,18 +75,25 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
       meta = CatalogUtil.newTableMeta(plan.getStorageType());
     }
 
+    // Rewrite a output schema because we don't have to store field values
+    // corresponding to partition key columns.
+    if (plan.getPartitions() != null && plan.getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
+      rewriteColumnPartitionedTableSchema();
+    }
+
     // Find column index to name subpartition directory path
     if (this.plan.getPartitions() != null) {
       if (this.plan.getPartitions().getColumns() != null) {
-        columnIndexes = new int[plan.getPartitions().getColumns().size()];
-        columnNames = new String[columnIndexes.length];
-        for(int i = 0; i < plan.getPartitions().getColumns().size(); i++)  {
-          Column targetColumn = plan.getPartitions().getColumn(i);
+        partitionColumnIndices = new int[plan.getPartitions().getColumns().size()];
+        partitionColumnNames = new String[partitionColumnIndices.length];
+        Schema columnPartitionSchema = plan.getPartitions().getSchema();
+        for(int i = 0; i < columnPartitionSchema.getColumnNum(); i++)  {
+          Column targetColumn = columnPartitionSchema.getColumn(i);
           for(int j = 0; j < plan.getInSchema().getColumns().size();j++) {
             Column inputColumn = plan.getInSchema().getColumn(j);
             if (inputColumn.getColumnName().equals(targetColumn.getColumnName())) {
-              columnIndexes[i] = j;
-              columnNames[i] = targetColumn.getColumnName();
+              partitionColumnIndices[i] = j;
+              partitionColumnNames[i] = targetColumn.getColumnName();
             }
           }
         }
@@ -94,6 +101,25 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
     }
   }
 
+  /**
+   * This method rewrites an input schema of column-partitioned table because
+   * there are no actual field values in data file in a column-partitioned table.
+   * So, this method removes partition key columns from the input schema.
+   */
+  private void rewriteColumnPartitionedTableSchema() {
+    PartitionDesc partitionDesc = plan.getPartitions();
+    Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
+    columnPartitionSchema.setQualifier(plan.getTableName());
+
+    Schema modifiedOutputSchema = new Schema();
+    for (Column column : outSchema.toArray()) {
+      if (columnPartitionSchema.getColumnByName(column.getColumnName()) == null) {
+        modifiedOutputSchema.addColumn(column);
+      }
+    }
+    outSchema = modifiedOutputSchema;
+  }
+
   public void init() throws IOException {
     super.init();
 
@@ -124,8 +150,7 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
         LOG.info("File size: " + status.getLen());
       }
 
-      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
-          outSchema, dataFile);
+      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
       appender.enableStats();
       appender.init();
       appenderMap.put(partition, appender);
@@ -148,12 +173,12 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
     while((tuple = child.next()) != null) {
       // set subpartition directory name
       sb.delete(0, sb.length());
-      if (columnIndexes != null) {
-        for(int i = 0; i < columnIndexes.length; i++) {
-          Datum datum = (Datum) tuple.get(columnIndexes[i]);
+      if (partitionColumnIndices != null) {
+        for(int i = 0; i < partitionColumnIndices.length; i++) {
+          Datum datum = tuple.get(partitionColumnIndices[i]);
           if(i > 0)
             sb.append("/");
-          sb.append(columnNames[i]).append("=");
+          sb.append(partitionColumnNames[i]).append("=");
           sb.append(datum.asChars());
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index b799095..4e6cd64 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -27,9 +27,9 @@ import java.io.IOException;
 
 public abstract class PhysicalExec implements SchemaObject {
   protected final TaskAttemptContext context;
-  protected final Schema inSchema;
-  protected final Schema outSchema;
-  protected final int outColumnNum;
+  protected Schema inSchema;
+  protected Schema outSchema;
+  protected int outColumnNum;
 
   public PhysicalExec(final TaskAttemptContext context, final Schema inSchema,
                       final Schema outSchema) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 5783080..d17f7ec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,15 +18,16 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.ScanNode;
@@ -34,8 +35,11 @@ import org.apache.tajo.storage.*;
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
+
 public class SeqScanExec extends PhysicalExec {
   private final ScanNode plan;
   private Scanner scanner = null;
@@ -63,8 +67,67 @@ public class SeqScanExec extends PhysicalExec {
     }
   }
 
+  /**
+   * This method rewrites an input schema of column-partitioned table because
+   * there are no actual field values in data file in a column-partitioned table.
+   * So, this method removes partition key columns from the input schema.
+   *
+   * TODO - This implementation assumes that a fragment is always FileFragment.
+   * In the column partitioned table, a path has an important role to
+   * indicate partition keys. In this time, it is right. Later, we have to fix it.
+   */
+  private void rewriteColumnPartitionedTableSchema() throws IOException {
+    PartitionDesc partitionDesc = plan.getTableDesc().getPartitions();
+    Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
+    List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
+
+    // Get a partition key value from a given path
+    Tuple partitionRow =
+        TupleUtil.buildTupleFromPartitionPath(columnPartitionSchema, fileFragments.get(0).getPath(), false);
+
+    // Remove partition key columns from an input schema.
+    columnPartitionSchema.setQualifier(inSchema.getColumn(0).getQualifier());
+    Schema modifiedInputSchema = new Schema();
+    for (Column column : inSchema.toArray()) {
+      if (columnPartitionSchema.getColumnByName(column.getColumnName()) == null) {
+        modifiedInputSchema.addColumn(column);
+      }
+    }
+    this.inSchema = modifiedInputSchema;
+
+    // Targets or search conditions may contain column references.
+    // However, actual values absent in tuples. So, Replace all column references by constant datum.
+    for (Column column : columnPartitionSchema.toArray()) {
+      FieldEval targetExpr = new FieldEval(column);
+      EvalContext evalContext = targetExpr.newContext();
+      targetExpr.eval(evalContext, columnPartitionSchema, partitionRow);
+      Datum datum = targetExpr.terminate(evalContext);
+      ConstEval constExpr = new ConstEval(datum);
+      for (Target target : plan.getTargets()) {
+        if (target.getEvalTree().equals(targetExpr)) {
+          if (!target.hasAlias()) {
+            target.setAlias(target.getEvalTree().getName());
+          }
+          target.setExpr(constExpr);
+        } else {
+          EvalTreeUtil.replace(target.getEvalTree(), targetExpr, constExpr);
+        }
+      }
+
+      if (plan.hasQual()) {
+        EvalTreeUtil.replace(plan.getQual(), targetExpr, constExpr);
+      }
+    }
+  }
+
   public void init() throws IOException {
     Schema projected;
+
+    if (plan.getTableDesc().hasPartitions()
+        && plan.getTableDesc().getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
+      rewriteColumnPartitionedTableSchema();
+    }
+
     if (plan.hasTargets()) {
       projected = new Schema();
       Set<Column> columnSet = new HashSet<Column>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 7673253..817e48a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -22,10 +22,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.PlannerUtil;
@@ -65,7 +62,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
   @Override
   public LogicalNode visitFilter(Set<EvalNode> cnf, LogicalPlan plan, SelectionNode selNode, Stack<LogicalNode> stack)
       throws PlanningException {
-    cnf.addAll(Sets.newHashSet(EvalTreeUtil.getConjNormalForm(selNode.getQual())));
+    cnf.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
 
     stack.push(selNode);
     visitChild(cnf, plan, selNode.getChild(), stack);
@@ -178,15 +175,15 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
           EvalNode qual2 = null;
           if (matched2.size() > 1) {
              // merged into one eval tree
-             qual2 = EvalTreeUtil.transformCNF2Singleton(
-                        matched2.toArray(new EvalNode [matched2.size()]));
+             qual2 = AlgebraicUtil.createSingletonExprFromCNF(
+                 matched2.toArray(new EvalNode[matched2.size()]));
           } else if (matched2.size() == 1) {
              // if the number of matched expr is one
              qual2 = matched2.get(0);
           }
 
           if (qual2 != null) {
-             EvalNode conjQual2 = EvalTreeUtil.transformCNF2Singleton(joinNode.getJoinQual(), qual2);
+             EvalNode conjQual2 = AlgebraicUtil.createSingletonExprFromCNF(joinNode.getJoinQual(), qual2);
              joinNode.setJoinQual(conjQual2);
              cnf.removeAll(matched2);
           } // for the remaining cnf, push it as usual
@@ -194,7 +191,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
     }
 
     if (joinNode.hasJoinQual()) {
-      cnf.addAll(Sets.newHashSet(EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual())));
+      cnf.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
     }
 
     visitChild(cnf, plan, left, stack);
@@ -210,7 +207,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
     EvalNode qual = null;
     if (matched.size() > 1) {
       // merged into one eval tree
-      qual = EvalTreeUtil.transformCNF2Singleton(
+      qual = AlgebraicUtil.createSingletonExprFromCNF(
           matched.toArray(new EvalNode[matched.size()]));
     } else if (matched.size() == 1) {
       // if the number of matched expr is one
@@ -243,8 +240,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
     EvalNode qual = null;
     if (matched.size() > 1) {
       // merged into one eval tree
-      qual = EvalTreeUtil.transformCNF2Singleton(
-          matched.toArray(new EvalNode [matched.size()]));
+      qual = AlgebraicUtil.createSingletonExprFromCNF(
+          matched.toArray(new EvalNode[matched.size()]));
     } else if (matched.size() == 1) {
       // if the number of matched expr is one
       qual = matched.get(0);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
new file mode 100644
index 0000000..561424f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class PartitionedTableRewriter implements RewriteRule {
+  private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class);
+
+  private static final String NAME = "Partitioned Table Rewriter";
+  private final Rewriter rewriter = new Rewriter();
+
+  private final TajoConf systemConf;
+
+  public PartitionedTableRewriter(TajoConf conf) {
+    systemConf = conf;
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      for (RelationNode relation : block.getRelations()) {
+        if (relation.getType() == NodeType.SCAN) {
+          TableDesc table = ((ScanNode)relation).getTableDesc();
+          if (table.hasPartitions()) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    boolean containsPartitionedTables;
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      containsPartitionedTables = false;
+      for (RelationNode relation : block.getRelations()) {
+        if (relation.getType() == NodeType.SCAN) {
+          TableDesc table = ((ScanNode)relation).getTableDesc();
+          if (table.hasPartitions()) {
+            containsPartitionedTables = true;
+          }
+        }
+      }
+      if (containsPartitionedTables) {
+        rewriter.visitChild(block, plan, block.getRoot(), new Stack<LogicalNode>());
+      }
+    }
+    return plan;
+  }
+
+  private static class PartitionPathFilter implements PathFilter {
+    private FileSystem fs;
+    private Schema schema;
+    private EvalNode partitionFilter;
+    private EvalContext evalContext;
+
+
+    public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
+      this.schema = schema;
+      this.partitionFilter = partitionFilter;
+      evalContext = partitionFilter.newContext();
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+      if (tuple == null) { // if it is a file or not acceptable file
+        return false;
+      }
+      partitionFilter.eval(evalContext, schema, tuple);
+      return partitionFilter.terminate(evalContext).asBool();
+    }
+
+    @Override
+    public String toString() {
+      return partitionFilter.toString();
+    }
+  }
+
+  /**
+   * It assumes that each conjunctive form corresponds to one column.
+   *
+   * @param partitionColumns
+   * @param conjunctiveForms search condition corresponding to partition columns.
+   *                         If it is NULL, it means that there is no search condition for this table.
+   * @param tablePath
+   * @return
+   * @throws IOException
+   */
+  private Path [] findFilteredPaths(Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath)
+      throws IOException {
+
+    FileSystem fs = tablePath.getFileSystem(systemConf);
+
+    PathFilter [] filters;
+    if (conjunctiveForms == null) {
+      filters = buildAllAcceptingPathFilters(partitionColumns);
+    } else {
+      filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms);
+    }
+
+    // loop from one to the number of partition columns
+    Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0]));
+
+    for (int i = 1; i < partitionColumns.getColumnNum(); i++) {
+      // Get all file status matched to a ith level path filter.
+      filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i]));
+    }
+
+    return filteredPaths;
+  }
+
+  /**
+   * Build path filters for all levels with a list of filter conditions.
+   *
+   * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3).
+   * Then, this methods will create three path filters for (col1), (col1, col2), (col1, col2, col3).
+   *
+   * Corresponding filter conditions will be placed on each path filter,
+   * If there is no corresponding expression for certain column,
+   * The condition will be filled with a true value.
+   *
+   * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'.
+   * There is no filter condition corresponding to col2.
+   * Then, the path filter conditions are corresponding to the followings:
+   *
+   * The first path filter: col1 = 'A'
+   * The second path filter: col1 = 'A' AND col2 IS NOT NULL
+   * The third path filter: col1 = 'A' AND col2 IS NOT NULL AND col3 = 'C'
+   *
+   * 'IS NOT NULL' predicate is always true against the partition path.
+   *
+   * @param partitionColumns
+   * @param conjunctiveForms
+   * @return
+   */
+  private static PathFilter [] buildPathFiltersForAllLevels(Schema partitionColumns,
+                                                     EvalNode [] conjunctiveForms) {
+    // Building partition path filters for all levels
+    Column target;
+    PathFilter [] filters = new PathFilter[partitionColumns.getColumnNum()];
+    List<EvalNode> accumulatedFilters = Lists.newArrayList();
+    for (int i = 0; i < partitionColumns.getColumnNum(); i++) { // loop from one to level
+      target = partitionColumns.getColumn(i);
+
+      for (EvalNode expr : conjunctiveForms) {
+        if (EvalTreeUtil.findDistinctRefColumns(expr).contains(target)) {
+          // Accumulate one qual per level
+          accumulatedFilters.add(expr);
+        }
+      }
+
+      if (accumulatedFilters.size() < (i + 1)) {
+        accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+      }
+
+      EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+          accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+      filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+    }
+    return filters;
+  }
+
+  /**
+   * Build an array of path filters for all levels with all accepting filter condition.
+   * @param partitionColumns The partition columns schema
+   * @return The array of path filter, accpeting all partition paths.
+   */
+  private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
+    Column target;
+    PathFilter [] filters = new PathFilter[partitionColumns.getColumnNum()];
+    List<EvalNode> accumulatedFilters = Lists.newArrayList();
+    for (int i = 0; i < partitionColumns.getColumnNum(); i++) { // loop from one to level
+      target = partitionColumns.getColumn(i);
+      accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+
+      EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+          accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+      filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+    }
+    return filters;
+  }
+
+  private static Path [] toPathArray(FileStatus[] fileStatuses) {
+    Path [] paths = new Path[fileStatuses.length];
+    for (int j = 0; j < fileStatuses.length; j++) {
+      paths[j] = fileStatuses[j].getPath();
+    }
+    return paths;
+  }
+
+  private Path [] findFilteredPartitionPaths(ScanNode scanNode) throws IOException {
+    TableDesc table = scanNode.getTableDesc();
+    FileSystem fs = table.getPath().getFileSystem(systemConf);
+    LOG.info("Partitioned Table Dir: " + table.getPath());
+    LOG.info("Summary: " + fs.getContentSummary(table.getPath()).getDirectoryCount());
+    PartitionDesc partitionDesc = scanNode.getTableDesc().getPartitions();
+
+    Schema paritionValuesSchema = new Schema();
+    for (Column column : partitionDesc.getColumns()) {
+      paritionValuesSchema.addColumn(column);
+    }
+
+    Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
+
+    // if a query statement has a search condition, try to find indexable predicates
+    if (scanNode.hasQual()) {
+      EvalNode [] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(scanNode.getQual());
+      Set<EvalNode> remainExprs = Sets.newHashSet(conjunctiveForms);
+
+      // add qualifier to schema for qual
+      paritionValuesSchema.setQualifier(scanNode.getCanonicalName());
+      for (Column column : paritionValuesSchema.getColumns()) {
+        for (EvalNode simpleExpr : conjunctiveForms) {
+          if (checkIfIndexablePredicateOnTargetColumn(simpleExpr, column)) {
+            indexablePredicateSet.add(simpleExpr);
+          }
+        }
+      }
+
+      // Partitions which are not matched to the partition filter conditions are pruned immediately.
+      // So, the partition filter conditions are not necessary later, and they are removed from
+      // original search condition for simplicity and efficiency.
+      remainExprs.removeAll(indexablePredicateSet);
+      if (remainExprs.isEmpty()) {
+        scanNode.setQual(null);
+      } else {
+        scanNode.setQual(
+            AlgebraicUtil.createSingletonExprFromCNF(remainExprs.toArray(new EvalNode[remainExprs.size()])));
+      }
+    }
+
+    if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates
+      return findFilteredPaths(paritionValuesSchema,
+          indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), table.getPath());
+    } else { // otherwise, we will get all partition paths.
+      return findFilteredPaths(paritionValuesSchema, null, table.getPath());
+    }
+  }
+
+  private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
+    if (checkIfIndexablePredicate(evalNode) || checkIfDisjunctiveButOneVariable(evalNode)) {
+      Set<Column> variables = EvalTreeUtil.findDistinctRefColumns(evalNode);
+      // if it contains only single variable matched to a target column
+      return variables.size() == 1 && variables.contains(targetColumn);
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Check if an expression consists of one variable and one constant and
+   * the expression is a comparison operator.
+   *
+   * @param evalNode The expression to be checked
+   * @return true if an expression consists of one variable and one constant
+   * and the expression is a comparison operator. Other, false.
+   */
+  private boolean checkIfIndexablePredicate(EvalNode evalNode) {
+    // TODO - LIKE with a trailing wild-card character and IN with an array can be indexable
+    return AlgebraicUtil.containSingleVar(evalNode) && AlgebraicUtil.isIndexableOperator(evalNode);
+  }
+
+  /**
+   *
+   * @param evalNode The expression to be checked
+   * @return true if an disjunctive expression, consisting of indexable expressions
+   */
+  private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) {
+    if (evalNode.getType() == EvalType.OR) {
+      boolean indexable =
+          checkIfIndexablePredicate(evalNode.getLeftExpr()) &&
+              checkIfIndexablePredicate(evalNode.getRightExpr());
+
+      boolean sameVariable =
+          EvalTreeUtil.findDistinctRefColumns(evalNode.getLeftExpr())
+          .equals(EvalTreeUtil.findDistinctRefColumns(evalNode.getRightExpr()));
+
+      return indexable && sameVariable;
+    } else {
+      return false;
+    }
+  }
+
+  private void updateTableStat(PartitionedTableScanNode scanNode) throws PlanningException {
+    if (scanNode.getInputPaths().length > 0) {
+      try {
+        FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(systemConf);
+        long totalVolume = 0;
+
+        for (Path input : scanNode.getInputPaths()) {
+          ContentSummary summary = fs.getContentSummary(input);
+          totalVolume += summary.getLength();
+          totalVolume += summary.getFileCount();
+        }
+        scanNode.getTableDesc().getStats().setNumBytes(totalVolume);
+      } catch (IOException e) {
+        throw new PlanningException(e);
+      }
+    }
+  }
+
+  private final class Rewriter extends BasicLogicalPlanVisitor<LogicalPlan.QueryBlock, Object> {
+    @Override
+    public Object visitScan(LogicalPlan.QueryBlock block, LogicalPlan plan, ScanNode scanNode, Stack<LogicalNode> stack)
+        throws PlanningException {
+
+      TableDesc table = scanNode.getTableDesc();
+      if (!table.hasPartitions()) {
+        return null;
+      }
+
+      try {
+        Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
+        plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
+        PartitionedTableScanNode rewrittenScanNode = new PartitionedTableScanNode(plan.newPID(), scanNode, filteredPaths);
+        updateTableStat(rewrittenScanNode);
+        PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
+      } catch (IOException e) {
+        throw new PlanningException("Partitioned Table Rewrite Failed: \n" + e.getMessage());
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index d0e9a6f..403b403 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -304,7 +304,7 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
     LogicalNode child = visitChild(newContext, plan, subRoot, newStack);
     newStack.pop();
     Schema inSchema = (Schema) child.getOutSchema().clone();
-    inSchema.setQualifier(node.getCanonicalName(), true);
+    inSchema.setQualifier(node.getCanonicalName());
     node.setInSchema(inSchema);
     return pushDownProjectablePost(context, node, isTopmostProjectable(stack));
   }


[3/3] git commit: TAJO-338 - Add Query Optimization Part for Column-Partitioned Tables. (hyunsik)

Posted by hy...@apache.org.
TAJO-338 - Add Query Optimization Part for Column-Partitioned Tables. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/f58f6ee8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/f58f6ee8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/f58f6ee8

Branch: refs/heads/master
Commit: f58f6ee827139c01253902f0b1076b47fde9d5e6
Parents: 4dbecf9
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Dec 18 21:39:30 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Thu Dec 19 11:36:10 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/tajo/algebra/CreateTable.java    |  30 +-
 .../tajo/catalog/AbstractCatalogClient.java     |   2 +-
 .../org/apache/tajo/catalog/DDLBuilder.java     |  20 +-
 .../java/org/apache/tajo/catalog/Schema.java    |  57 ++-
 .../java/org/apache/tajo/catalog/TableDesc.java |  30 +-
 .../tajo/catalog/partition/PartitionDesc.java   | 194 ++++++++++
 .../tajo/catalog/partition/Partitions.java      | 349 -----------------
 .../src/main/proto/CatalogProtos.proto          |   7 +-
 .../org/apache/tajo/catalog/TestSchema.java     |  26 ++
 .../tajo/catalog/store/HCatalogStore.java       |   6 +-
 .../tajo/catalog/store/TestHCatalogStore.java   |   2 +-
 .../tajo/catalog/store/AbstractDBStore.java     |  42 +--
 .../apache/tajo/catalog/store/DerbyStore.java   |  42 +--
 .../org/apache/tajo/catalog/TestCatalog.java    |  72 ++--
 .../org/apache/tajo/catalog/TestDBStore.java    |  62 ++--
 .../org/apache/tajo/datum/BooleanDatum.java     |   2 +
 .../org/apache/tajo/datum/DatumFactory.java     |  34 ++
 .../main/java/org/apache/tajo/util/TUtil.java   |   2 +-
 .../org/apache/tajo/engine/parser/SQLLexer.g4   |   3 +-
 .../org/apache/tajo/engine/parser/SQLParser.g4  |   2 +-
 .../apache/tajo/engine/eval/AlgebraicUtil.java  | 134 ++++++-
 .../apache/tajo/engine/eval/EvalTreeUtil.java   |  62 +---
 .../org/apache/tajo/engine/eval/InEval.java     |   2 +-
 .../tajo/engine/eval/LikePredicateEval.java     |   4 +
 .../apache/tajo/engine/parser/SQLAnalyzer.java  |  10 +-
 .../engine/planner/BasicLogicalPlanVisitor.java |   8 +
 .../tajo/engine/planner/LogicalOptimizer.java   |  11 +-
 .../tajo/engine/planner/LogicalPlanVisitor.java |   2 +
 .../tajo/engine/planner/LogicalPlanner.java     |  72 ++--
 .../engine/planner/PhysicalPlannerImpl.java     |   4 +-
 .../apache/tajo/engine/planner/PlannerUtil.java |   2 +-
 .../org/apache/tajo/engine/planner/Target.java  |   4 +
 .../engine/planner/global/ExecutionBlock.java   |   4 +
 .../engine/planner/global/GlobalPlanner.java    |   2 +-
 .../engine/planner/logical/CreateTableNode.java |  20 +-
 .../tajo/engine/planner/logical/NodeType.java   |   1 +
 .../logical/PartitionedTableScanNode.java       | 180 +++++++++
 .../engine/planner/logical/RelationNode.java    |   2 +-
 .../tajo/engine/planner/logical/ScanNode.java   |  19 +-
 .../engine/planner/logical/StoreTableNode.java  |  24 +-
 .../planner/logical/TableSubQueryNode.java      |   2 +-
 .../join/GreedyHeuristicJoinOrderAlgorithm.java |   8 +-
 .../engine/planner/logical/join/JoinGraph.java  |   3 +-
 .../ColumnPartitionedTableStoreExec.java        |  65 +++-
 .../engine/planner/physical/PhysicalExec.java   |   6 +-
 .../engine/planner/physical/SeqScanExec.java    |  69 +++-
 .../planner/rewrite/FilterPushDownRule.java     |  21 +-
 .../rewrite/PartitionedTableRewriter.java       | 371 +++++++++++++++++++
 .../planner/rewrite/ProjectionPushDownRule.java |   2 +-
 .../org/apache/tajo/engine/utils/TupleUtil.java | 106 ++++++
 .../tajo/master/DefaultTaskScheduler.java       |  19 +-
 .../org/apache/tajo/master/GlobalEngine.java    |  12 +-
 .../tajo/master/TajoMasterClientService.java    |  10 +-
 .../master/querymaster/QueryMasterTask.java     |  10 +-
 .../tajo/master/querymaster/SubQuery.java       |  44 ++-
 .../src/main/proto/ClientProtos.proto           |   2 +-
 .../org/apache/tajo/client/TestTajoClient.java  |   4 +-
 .../apache/tajo/engine/eval/ExprTestBase.java   |   2 +-
 .../tajo/engine/eval/TestEvalTreeUtil.java      |  24 +-
 .../tajo/engine/parser/TestSQLAnalyzer.java     |   7 +-
 .../engine/planner/TestLogicalOptimizer.java    |   2 +-
 .../tajo/engine/planner/TestLogicalPlan.java    |   2 +-
 .../planner/physical/TestBSTIndexExec.java      |   2 +-
 .../planner/physical/TestHashAntiJoinExec.java  |   2 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   2 +-
 .../planner/physical/TestPhysicalPlanner.java   |   2 +-
 .../engine/planner/physical/TestSortExec.java   |   2 +-
 .../tajo/engine/query/TestInsertQuery.java      | 201 ----------
 .../tajo/engine/query/TestTablePartitions.java  | 360 ++++++++++++++++++
 .../apache/tajo/engine/util/TestTupleUtil.java  |  52 ++-
 .../tajo/master/TestExecutionBlockCursor.java   |   2 +-
 .../apache/tajo/master/TestGlobalPlanner.java   |   2 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |   2 +-
 .../create_table_partition_by_column.sql        |   2 +-
 75 files changed, 2000 insertions(+), 976 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5d6ba59..cceb1cb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,9 @@ Release 0.8.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-338 - Add Query Optimization Part for Column-Partitioned Tables.
+    (hyunsik)
+
     TAJO-333: Add metric system to Tajo. (hyoungjunkim via jihoon)
 
     TAJO-413: Implement pi function. (DaeMyung Kang via jihoon)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
index a6bc1e7..a85e366 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
@@ -31,7 +31,7 @@ public class CreateTable extends Expr {
   private String location;
   private Expr subquery;
   private Map<String, String> params;
-  private PartitionOption partition;
+  private PartitionDescExpr partition;
 
   public CreateTable(final String tableName) {
     super(OpType.CreateTable);
@@ -107,11 +107,11 @@ public class CreateTable extends Expr {
     return partition != null;
   }
 
-  public void setPartition(PartitionOption partition) {
+  public void setPartition(PartitionDescExpr partition) {
     this.partition = partition;
   }
 
-  public <T extends PartitionOption> T getPartition() {
+  public <T extends PartitionDescExpr> T getPartition() {
     return (T) this.partition;
   }
 
@@ -178,10 +178,10 @@ public class CreateTable extends Expr {
     COLUMN
   }
 
-  public static abstract class PartitionOption {
+  public static abstract class PartitionDescExpr {
     PartitionType type;
 
-    public PartitionOption(PartitionType type) {
+    public PartitionDescExpr(PartitionType type) {
       this.type = type;
     }
 
@@ -190,7 +190,7 @@ public class CreateTable extends Expr {
     }
   }
 
-  public static class RangePartition extends PartitionOption {
+  public static class RangePartition extends PartitionDescExpr {
     ColumnReferenceExpr [] columns;
     List<RangePartitionSpecifier> specifiers;
 
@@ -209,7 +209,7 @@ public class CreateTable extends Expr {
     }
   }
 
-  public static class HashPartition extends PartitionOption {
+  public static class HashPartition extends PartitionDescExpr {
     ColumnReferenceExpr [] columns;
     Expr quantity;
     List<PartitionSpecifier> specifiers;
@@ -246,7 +246,7 @@ public class CreateTable extends Expr {
     }
   }
 
-  public static class ListPartition extends PartitionOption {
+  public static class ListPartition extends PartitionDescExpr {
     ColumnReferenceExpr [] columns;
     List<ListPartitionSpecifier> specifiers;
 
@@ -265,17 +265,23 @@ public class CreateTable extends Expr {
     }
   }
 
-  public static class ColumnPartition extends PartitionOption {
-    ColumnReferenceExpr [] columns;
+  public static class ColumnPartition extends PartitionDescExpr {
+    private ColumnDefinition [] columns;
+    private boolean isOmitValues;
 
-    public ColumnPartition(ColumnReferenceExpr [] columns) {
+    public ColumnPartition(ColumnDefinition [] columns, boolean isOmitValues) {
       super(PartitionType.COLUMN);
       this.columns = columns;
+      this.isOmitValues = isOmitValues;
     }
 
-    public ColumnReferenceExpr [] getColumns() {
+    public ColumnDefinition [] getColumns() {
       return columns;
     }
+
+    public boolean isOmitValues() {
+      return isOmitValues;
+    }
   }
 
   public static class RangePartitionSpecifier extends PartitionSpecifier {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 5f79f10..44ac8f4 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -122,7 +122,7 @@ public abstract class AbstractCatalogClient implements CatalogService {
       return new ServerCallable<Boolean>(conf, catalogServerAddr, CatalogProtocol.class, false) {
         public Boolean call(NettyClientBase client) throws ServiceException {
           CatalogProtocolService.BlockingInterface stub = getStub(client);
-          return stub.addTable(null, (TableDescProto) desc.getProto()).getValue();
+          return stub.addTable(null, desc.getProto()).getValue();
         }
       }.withRetries();
     } catch (ServiceException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
index a9d0f03..c818be7 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/DDLBuilder.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.catalog;
 
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
@@ -96,15 +96,15 @@ public class DDLBuilder {
   }
 
   private static void buildPartitionClause(StringBuilder sb, TableDesc desc) {
-    Partitions partitions = desc.getPartitions();
+    PartitionDesc partitionDesc = desc.getPartitions();
 
     sb.append(" PARTITION BY ");
-    sb.append(partitions.getPartitionsType().name());
+    sb.append(partitionDesc.getPartitionsType().name());
 
     // columns
     sb.append("(");
     int columnCount = 0;
-    for(Column column: partitions.getColumns()) {
+    for(Column column: partitionDesc.getColumns()) {
       for(Column targetColumn: desc.getSchema().getColumns()) {
         if (column.getColumnName().equals(targetColumn.getColumnName()))  {
           if (columnCount > 0)
@@ -118,12 +118,12 @@ public class DDLBuilder {
     sb.append(")");
 
     // specifier
-    if (partitions.getSpecifiers() != null
-        && !partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.COLUMN)) {
+    if (partitionDesc.getSpecifiers() != null
+        && !partitionDesc.getPartitionsType().equals(CatalogProtos.PartitionsType.COLUMN)) {
 
       sb.append(" (");
-      for(int i = 0; i < partitions.getSpecifiers().size(); i++) {
-        Specifier specifier = partitions.getSpecifiers().get(i);
+      for(int i = 0; i < partitionDesc.getSpecifiers().size(); i++) {
+        Specifier specifier = partitionDesc.getSpecifiers().get(i);
         if (i > 0)
           sb.append(",");
 
@@ -132,7 +132,7 @@ public class DDLBuilder {
         if (!specifier.getName().isEmpty())
           sb.append(" ").append(specifier.getName());
 
-        if (partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.LIST)) {
+        if (partitionDesc.getPartitionsType().equals(CatalogProtos.PartitionsType.LIST)) {
           if (!specifier.getExpressions().isEmpty()) {
             sb.append(" VALUES (");
             String[] expressions = specifier.getExpressions().split("\\,");
@@ -144,7 +144,7 @@ public class DDLBuilder {
             sb.append(")");
 
           }
-        } else if (partitions.getPartitionsType().equals(CatalogProtos.PartitionsType.RANGE))  {
+        } else if (partitionDesc.getPartitionsType().equals(CatalogProtos.PartitionsType.RANGE))  {
           sb.append(" VALUES LESS THAN (");
           if (!specifier.getExpressions().isEmpty()) {
             sb.append(specifier.getExpressions());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
index 8a2d028..4ea3b81 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java
@@ -39,26 +39,24 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
 	private	SchemaProto.Builder builder = SchemaProto.newBuilder();
 
 	@Expose protected List<Column> fields = null;
-	@Expose protected Map<String, Integer> fieldsByQialifiedName = null;
+	@Expose protected Map<String, Integer> fieldsByQualifiedName = null;
   @Expose protected Map<String, List<Integer>> fieldsByName = null;
 
 	public Schema() {
-    this.fields = new ArrayList<Column>();
-    this.fieldsByQialifiedName = new TreeMap<String, Integer>();
-    this.fieldsByName = new HashMap<String, List<Integer>>();
+    init();
 	}
 	
 	public Schema(SchemaProto proto) {
     this.fields = new ArrayList<Column>();
-    this.fieldsByQialifiedName = new HashMap<String, Integer>();
+    this.fieldsByQualifiedName = new HashMap<String, Integer>();
     this.fieldsByName = new HashMap<String, List<Integer>>();
     for(ColumnProto colProto : proto.getFieldsList()) {
       Column tobeAdded = new Column(colProto);
       fields.add(tobeAdded);
       if (tobeAdded.hasQualifier()) {
-        fieldsByQialifiedName.put(tobeAdded.getQualifier() + "." + tobeAdded.getColumnName(), fields.size() - 1);
+        fieldsByQualifiedName.put(tobeAdded.getQualifier() + "." + tobeAdded.getColumnName(), fields.size() - 1);
       } else {
-        fieldsByQialifiedName.put(tobeAdded.getColumnName(), fields.size() - 1);
+        fieldsByQualifiedName.put(tobeAdded.getColumnName(), fields.size() - 1);
       }
       if (fieldsByName.containsKey(tobeAdded.getColumnName())) {
         fieldsByName.get(tobeAdded.getColumnName()).add(fields.size() - 1);
@@ -71,17 +69,23 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
 	public Schema(Schema schema) {
 	  this();
 		this.fields.addAll(schema.fields);
-		this.fieldsByQialifiedName.putAll(schema.fieldsByQialifiedName);
+		this.fieldsByQualifiedName.putAll(schema.fieldsByQualifiedName);
     this.fieldsByName.putAll(schema.fieldsByName);
 	}
-	
+
 	public Schema(Column [] columns) {
-    this();
+    init();
     for(Column c : columns) {
       addColumn(c);
     }
   }
 
+  private void init() {
+    this.fields = new ArrayList<Column>();
+    this.fieldsByQualifiedName = new HashMap<String, Integer>();
+    this.fieldsByName = new HashMap<String, List<Integer>>();
+  }
+
   /**
    * Set a qualifier to this schema.
    * This changes the qualifier of all columns except for not-qualified columns.
@@ -89,26 +93,10 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
    * @param qualifier The qualifier
    */
   public void setQualifier(String qualifier) {
-    setQualifier(qualifier, false);
-  }
-
-  /**
-   * Set a qualifier to this schema. This changes the qualifier of all columns if force is true.
-   * Otherwise, it changes the qualifier of all columns except for non-qualified columns
-   *
-   * @param qualifier The qualifier
-   * @param force If true, all columns' qualifiers will be changed. Otherwise, only qualified columns' qualifiers will
-   *              be changed.
-   */
-  public void setQualifier(String qualifier, boolean force) {
-    fieldsByQialifiedName.clear();
-
+    fieldsByQualifiedName.clear();
     for (int i = 0; i < getColumnNum(); i++) {
-      if (!force && fields.get(i).hasQualifier()) {
-        continue;
-      }
       fields.get(i).setQualifier(qualifier);
-      fieldsByQialifiedName.put(fields.get(i).getQualifiedName(), i);
+      fieldsByQualifiedName.put(fields.get(i).getQualifiedName(), i);
     }
   }
 	
@@ -121,7 +109,7 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
   }
 
 	public Column getColumnByFQN(String qualifiedName) {
-		Integer cid = fieldsByQialifiedName.get(qualifiedName.toLowerCase());
+		Integer cid = fieldsByQualifiedName.get(qualifiedName.toLowerCase());
 		return cid != null ? fields.get(cid) : null;
 	}
 	
@@ -151,13 +139,14 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
 	}
 	
 	public int getColumnId(String qualifiedName) {
-	  return fieldsByQialifiedName.get(qualifiedName.toLowerCase());
+	  return fieldsByQualifiedName.get(qualifiedName.toLowerCase());
 	}
 
   public int getColumnIdByName(String colName) {
     for (Column col : fields) {
       if (col.getColumnName().equals(colName.toLowerCase())) {
-        return fieldsByQialifiedName.get(col.getQualifiedName());
+        String qualifiedName = col.getQualifiedName();
+        return fieldsByQualifiedName.get(qualifiedName);
       }
     }
     return -1;
@@ -168,7 +157,7 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
 	}
 	
 	public boolean contains(String colName) {
-		return fieldsByQialifiedName.containsKey(colName.toLowerCase());
+		return fieldsByQualifiedName.containsKey(colName.toLowerCase());
 
 	}
 
@@ -189,14 +178,14 @@ public class Schema implements ProtoObject<SchemaProto>, Cloneable, GsonObject {
 
   public synchronized Schema addColumn(String name, DataType dataType) {
 		String normalized = name.toLowerCase();
-		if(fieldsByQialifiedName.containsKey(normalized)) {
+		if(fieldsByQualifiedName.containsKey(normalized)) {
 		  LOG.error("Already exists column " + normalized);
 			throw new AlreadyExistsFieldException(normalized);
 		}
 			
 		Column newCol = new Column(normalized, dataType);
 		fields.add(newCol);
-		fieldsByQialifiedName.put(newCol.getQualifiedName(), fields.size() - 1);
+		fieldsByQualifiedName.put(newCol.getQualifiedName(), fields.size() - 1);
     fieldsByName.put(newCol.getColumnName(), TUtil.newList(fields.size() - 1));
 		
 		return this;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
index 458a99a..8fffc6e 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableDesc.java
@@ -25,12 +25,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableDescProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
 
 public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Cloneable {
   private final Log LOG = LogFactory.getLog(TableDesc.class);
@@ -42,7 +43,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
   @Expose protected TableMeta meta; // required
   @Expose protected Path uri; // required
   @Expose	protected TableStats stats; // optional
-  @Expose protected Partitions partitions; //optional
+  @Expose protected PartitionDesc partitionDesc; //optional
   
 	public TableDesc() {
 		builder = TableDescProto.newBuilder();
@@ -65,7 +66,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
 	  this(proto.getId(), new Schema(proto.getSchema()), new TableMeta(proto.getMeta()), new Path(proto.getPath()));
     this.stats = new TableStats(proto.getStats());
     if (proto.getPartitions() != null && !proto.getPartitions().toString().isEmpty()) {
-      this.partitions = new Partitions(proto.getPartitions());
+      this.partitionDesc = new PartitionDesc(proto.getPartitions());
     }
 	}
 	
@@ -115,22 +116,27 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
   }
 
   public boolean hasPartitions() {
-    return this.partitions != null;
+    return this.partitionDesc != null;
   }
 
-  public Partitions getPartitions() {
-    return partitions;
+  public PartitionDesc getPartitions() {
+    return partitionDesc;
   }
 
-  public void setPartitions(Partitions partitions) {
-    this.partitions = partitions;
+  public void setPartitions(PartitionDesc partitionDesc) {
+    this.partitionDesc = partitionDesc;
   }
 
   public boolean equals(Object object) {
     if(object instanceof TableDesc) {
       TableDesc other = (TableDesc) object;
       
-      return this.getProto().equals(other.getProto());
+      boolean eq = tableName.equals(other.tableName);
+      eq = eq && schema.equals(other.schema);
+      eq = eq && meta.equals(other.meta);
+      eq = eq && uri.equals(other.uri);
+      eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
+      return eq && TUtil.checkEquals(stats, other.stats);
     }
     
     return false;   
@@ -144,7 +150,7 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
     desc.meta = (TableMeta) meta.clone();
     desc.uri = uri;
     desc.stats = stats != null ? (TableStats) stats.clone() : null;
-    desc.partitions = partitions != null ? (Partitions) partitions.clone() : null;
+    desc.partitionDesc = partitionDesc != null ? (PartitionDesc) partitionDesc.clone() : null;
 	  
 	  return desc;
 	}
@@ -178,8 +184,8 @@ public class TableDesc implements ProtoObject<TableDescProto>, GsonObject, Clone
     if (this.stats != null) {
       builder.setStats(this.stats.getProto());
     }
-    if (this.partitions != null) {
-      builder.setPartitions(this.partitions.getProto());
+    if (this.partitionDesc != null) {
+      builder.setPartitions(this.partitionDesc.getProto());
     }
     return builder.build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
new file mode 100644
index 0000000..fbec807
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.partition;
+
+import com.google.common.collect.ImmutableList;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescProto>, Cloneable, GsonObject {
+  @Expose protected CatalogProtos.PartitionsType partitionsType; //required
+  @Expose protected Schema schema;
+  @Expose protected int numPartitions; //optional
+  @Expose protected List<Specifier> specifiers; //optional
+  @Expose protected boolean isOmitValues = false; // optional;
+
+  private CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
+
+  public PartitionDesc() {
+  }
+
+  public PartitionDesc(PartitionDesc partition) {
+    this();
+    this.partitionsType = partition.partitionsType;
+    this.schema = partition.schema;
+    this.numPartitions = partition.numPartitions;
+    this.specifiers = partition.specifiers;
+  }
+
+  public PartitionDesc(CatalogProtos.PartitionsType partitionsType, Column[] columns, int numPartitions,
+                       List<Specifier> specifiers) {
+    this();
+    this.partitionsType = partitionsType;
+    for (Column c : columns) {
+      addColumn(c);
+    }
+    this.numPartitions = numPartitions;
+    this.specifiers = specifiers;
+  }
+
+  public PartitionDesc(CatalogProtos.PartitionDescProto proto) {
+    this.partitionsType = proto.getPartitionsType();
+    this.schema = new Schema(proto.getSchema());
+    this.numPartitions = proto.getNumPartitions();
+    this.isOmitValues = proto.getIsOmitValues();
+    if(proto.getSpecifiersList() != null) {
+      this.specifiers = TUtil.newList();
+      for(CatalogProtos.SpecifierProto specifier: proto.getSpecifiersList()) {
+        this.specifiers.add(new Specifier(specifier));
+      }
+    }
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public List<Column> getColumns() {
+    return ImmutableList.copyOf(schema.toArray());
+  }
+
+  public void setColumns(Collection<Column> columns) {
+    this.schema = new Schema(columns.toArray(new Column[columns.size()]));
+  }
+
+  public synchronized void addColumn(Column column) {
+    if (schema == null) {
+      schema = new Schema();
+    }
+    schema.addColumn(column);
+  }
+
+  public synchronized void addSpecifier(Specifier specifier) {
+    if(specifiers == null)
+      specifiers = TUtil.newList();
+
+    specifiers.add(specifier);
+  }
+
+  public CatalogProtos.PartitionsType getPartitionsType() {
+    return partitionsType;
+  }
+
+  public void setPartitionsType(CatalogProtos.PartitionsType partitionsType) {
+    this.partitionsType = partitionsType;
+  }
+
+  public int getNumPartitions() {
+    return numPartitions;
+  }
+
+  public void setNumPartitions(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
+  public List<Specifier> getSpecifiers() {
+    return specifiers;
+  }
+
+  public void setSpecifiers(List<Specifier> specifiers) {
+    this.specifiers = specifiers;
+  }
+
+  public void setOmitValues(boolean flag) {
+    isOmitValues = flag;
+  }
+
+  public boolean isOmitValues() {
+    return isOmitValues;
+  }
+
+  public boolean equals(Object o) {
+    if (o instanceof PartitionDesc) {
+      PartitionDesc another = (PartitionDesc) o;
+      boolean eq = partitionsType == another.partitionsType;
+      eq = eq && schema.equals(another.schema);
+      eq = eq && numPartitions == another.numPartitions;
+      eq = eq && TUtil.checkEquals(specifiers, another.specifiers);
+      eq = eq && isOmitValues == another.isOmitValues;
+      return eq;
+    }
+    return false;
+  }
+
+  public Object clone() throws CloneNotSupportedException {
+    PartitionDesc copy = (PartitionDesc) super.clone();
+    copy.builder = CatalogProtos.PartitionDescProto.newBuilder();
+    copy.setPartitionsType(this.partitionsType);
+    copy.schema = new Schema(schema.getProto());
+    copy.setNumPartitions(this.numPartitions);
+    copy.specifiers = new ArrayList<Specifier>(this.specifiers);
+    copy.isOmitValues = isOmitValues;
+
+    return copy;
+  }
+
+  @Override
+  public CatalogProtos.PartitionDescProto getProto() {
+    if (builder == null) {
+      builder = CatalogProtos.PartitionDescProto.newBuilder();
+    }
+    if (this.partitionsType != null) {
+      builder.setPartitionsType(this.partitionsType);
+    }
+    builder.setSchema(schema.getProto());
+    builder.setNumPartitions(numPartitions);
+    builder.setIsOmitValues(isOmitValues);
+    if (this.specifiers != null) {
+      for(Specifier specifier: specifiers) {
+        builder.addSpecifiers(specifier.getProto());
+      }
+    }
+    return builder.build();
+  }
+
+  public String toString() {
+    Gson gson = new GsonBuilder().setPrettyPrinting().
+        excludeFieldsWithoutExposeAnnotation().create();
+    return gson.toJson(this);
+  }
+
+  @Override
+  public String toJson() {
+    return CatalogGsonHelper.toJson(this, PartitionDesc.class);
+
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
deleted file mode 100644
index c82f0cb..0000000
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/Partitions.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.catalog.partition;
-
-import com.google.common.collect.ImmutableList;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.annotations.Expose;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.exception.AlreadyExistsFieldException;
-import org.apache.tajo.catalog.json.CatalogGsonHelper;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.json.GsonObject;
-import org.apache.tajo.util.TUtil;
-
-import java.util.*;
-
-public class Partitions implements ProtoObject<CatalogProtos.PartitionsProto>, Cloneable, GsonObject {
-
-  private static final Log LOG = LogFactory.getLog(Partitions.class);
-
-  @Expose protected CatalogProtos.PartitionsType partitionsType; //required
-  @Expose protected List<Column> columns; //required
-  @Expose protected int numPartitions; //optional
-  @Expose protected List<Specifier> specifiers; //optional
-  @Expose protected Map<String, Integer> columnsByQialifiedName = null;
-  @Expose protected Map<String, List<Integer>> columnsByName = null;
-
-  private CatalogProtos.PartitionsProto.Builder builder = CatalogProtos.PartitionsProto.newBuilder();
-
-  public Partitions() {
-    this.columns = new ArrayList<Column>();
-    this.columnsByQialifiedName = new TreeMap<String, Integer>();
-    this.columnsByName = new HashMap<String, List<Integer>>();
-  }
-
-  public Partitions(Partitions partition) {
-    this();
-    this.partitionsType = partition.partitionsType;
-    this.columns.addAll(partition.columns);
-    this.columnsByQialifiedName.putAll(partition.columnsByQialifiedName);
-    this.columnsByName.putAll(partition.columnsByName);
-    this.numPartitions = partition.numPartitions;
-    this.specifiers = partition.specifiers;
-  }
-
-  public Partitions(CatalogProtos.PartitionsType partitionsType, Column[] columns, int numPartitions,
-                   List<Specifier> specifiers) {
-    this();
-    this.partitionsType = partitionsType;
-    for (Column c : columns) {
-      addColumn(c);
-    }
-    this.numPartitions = numPartitions;
-    this.specifiers = specifiers;
-  }
-
-  public Partitions(CatalogProtos.PartitionsProto proto) {
-    this.partitionsType = proto.getPartitionsType();
-    this.columns = new ArrayList<Column>();
-    this.columnsByQialifiedName = new HashMap<String, Integer>();
-    this.columnsByName = new HashMap<String, List<Integer>>();
-    for (CatalogProtos.ColumnProto colProto : proto.getColumnsList()) {
-      Column tobeAdded = new Column(colProto);
-      columns.add(tobeAdded);
-      if (tobeAdded.hasQualifier()) {
-        columnsByQialifiedName.put(tobeAdded.getQualifier() + "." + tobeAdded.getColumnName(),
-            columns.size() - 1);
-      } else {
-        columnsByQialifiedName.put(tobeAdded.getColumnName(), columns.size() - 1);
-      }
-      if (columnsByName.containsKey(tobeAdded.getColumnName())) {
-        columnsByName.get(tobeAdded.getColumnName()).add(columns.size() - 1);
-      } else {
-        columnsByName.put(tobeAdded.getColumnName(), TUtil.newList(columns.size() - 1));
-      }
-    }
-    this.numPartitions = proto.getNumPartitions();
-    if(proto.getSpecifiersList() != null) {
-      this.specifiers = TUtil.newList();
-      for(CatalogProtos.SpecifierProto specifier: proto.getSpecifiersList()) {
-        this.specifiers.add(new Specifier(specifier));
-      }
-    }
-  }
-
-  /**
-   * Set a qualifier to this schema.
-   * This changes the qualifier of all columns except for not-qualified columns.
-   *
-   * @param qualifier The qualifier
-   */
-  public void setQualifier(String qualifier) {
-    setQualifier(qualifier, false);
-  }
-
-  /**
-   * Set a qualifier to this schema. This changes the qualifier of all columns if force is true.
-   * Otherwise, it changes the qualifier of all columns except for non-qualified columns
-   *
-   * @param qualifier The qualifier
-   * @param force     If true, all columns' qualifiers will be changed. Otherwise,
-   *                  only qualified columns' qualifiers will
-   *                  be changed.
-   */
-  public void setQualifier(String qualifier, boolean force) {
-    columnsByQialifiedName.clear();
-
-    for (int i = 0; i < getColumnNum(); i++) {
-      if (!force && columns.get(i).hasQualifier()) {
-        continue;
-      }
-      columns.get(i).setQualifier(qualifier);
-      columnsByQialifiedName.put(columns.get(i).getQualifiedName(), i);
-    }
-  }
-
-  public int getColumnNum() {
-    return this.columns.size();
-  }
-
-  public Column getColumn(int id) {
-    return columns.get(id);
-  }
-
-  public Column getColumnByFQN(String qualifiedName) {
-    Integer cid = columnsByQialifiedName.get(qualifiedName.toLowerCase());
-    return cid != null ? columns.get(cid) : null;
-  }
-
-  public Column getColumnByName(String colName) {
-    String normalized = colName.toLowerCase();
-    List<Integer> list = columnsByName.get(normalized);
-
-    if (list == null || list.size() == 0) {
-      return null;
-    }
-
-    if (list.size() == 1) {
-      return columns.get(list.get(0));
-    } else {
-      StringBuilder sb = new StringBuilder();
-      boolean first = true;
-      for (Integer id : list) {
-        if (first) {
-          first = false;
-        } else {
-          sb.append(", ");
-        }
-        sb.append(columns.get(id));
-      }
-      throw new RuntimeException("Ambiguous Column Name: " + sb.toString());
-    }
-  }
-
-  public int getColumnId(String qualifiedName) {
-    return columnsByQialifiedName.get(qualifiedName.toLowerCase());
-  }
-
-  public int getColumnIdByName(String colName) {
-    for (Column col : columns) {
-      if (col.getColumnName().equals(colName.toLowerCase())) {
-        return columnsByQialifiedName.get(col.getQualifiedName());
-      }
-    }
-    return -1;
-  }
-
-  public List<Column> getColumns() {
-    return ImmutableList.copyOf(columns);
-  }
-
-  public void setColumns(List<Column> columns) {
-    this.columns = columns;
-  }
-
-  public boolean contains(String colName) {
-    return columnsByQialifiedName.containsKey(colName.toLowerCase());
-
-  }
-
-  public boolean containsAll(Collection<Column> columns) {
-    return columns.containsAll(columns);
-  }
-
-  public synchronized Partitions addColumn(String name, TajoDataTypes.Type type) {
-    if (type == TajoDataTypes.Type.CHAR) {
-      return addColumn(name, CatalogUtil.newDataTypeWithLen(type, 1));
-    }
-    return addColumn(name, CatalogUtil.newSimpleDataType(type));
-  }
-
-  public synchronized Partitions addColumn(String name, TajoDataTypes.Type type, int length) {
-    return addColumn(name, CatalogUtil.newDataTypeWithLen(type, length));
-  }
-
-  public synchronized Partitions addColumn(String name, TajoDataTypes.DataType dataType) {
-    String normalized = name.toLowerCase();
-    if (columnsByQialifiedName.containsKey(normalized)) {
-      LOG.error("Already exists column " + normalized);
-      throw new AlreadyExistsFieldException(normalized);
-    }
-
-    Column newCol = new Column(normalized, dataType);
-    columns.add(newCol);
-    columnsByQialifiedName.put(newCol.getQualifiedName(), columns.size() - 1);
-    columnsByName.put(newCol.getColumnName(), TUtil.newList(columns.size() - 1));
-
-    return this;
-  }
-
-  public synchronized void addColumn(Column column) {
-    addColumn(column.getQualifiedName(), column.getDataType());
-  }
-
-  public synchronized void addColumns(Partitions schema) {
-    for (Column column : schema.getColumns()) {
-      addColumn(column);
-    }
-  }
-
-  public synchronized void addSpecifier(Specifier specifier) {
-    if(specifiers == null)
-      specifiers = TUtil.newList();
-
-    specifiers.add(specifier);
-  }
-
-  public CatalogProtos.PartitionsType getPartitionsType() {
-    return partitionsType;
-  }
-
-  public void setPartitionsType(CatalogProtos.PartitionsType partitionsType) {
-    this.partitionsType = partitionsType;
-  }
-
-  public int getNumPartitions() {
-    return numPartitions;
-  }
-
-  public void setNumPartitions(int numPartitions) {
-    this.numPartitions = numPartitions;
-  }
-
-  public List<Specifier> getSpecifiers() {
-    return specifiers;
-  }
-
-  public void setSpecifiers(List<Specifier> specifiers) {
-    this.specifiers = specifiers;
-  }
-
-  public Map<String, Integer> getColumnsByQialifiedName() {
-    return columnsByQialifiedName;
-  }
-
-  public void setColumnsByQialifiedName(Map<String, Integer> columnsByQialifiedName) {
-    this.columnsByQialifiedName = columnsByQialifiedName;
-  }
-
-  public Map<String, List<Integer>> getColumnsByName() {
-    return columnsByName;
-  }
-
-  public void setColumnsByName(Map<String, List<Integer>> columnsByName) {
-    this.columnsByName = columnsByName;
-  }
-
-  public boolean equals(Object o) {
-    if (o instanceof Partitions) {
-      Partitions other = (Partitions) o;
-      return getProto().equals(other.getProto());
-    }
-    return false;
-  }
-
-  public Object clone() throws CloneNotSupportedException {
-    Partitions clone = (Partitions) super.clone();
-    clone.builder = CatalogProtos.PartitionsProto.newBuilder();
-    clone.setPartitionsType(this.partitionsType);
-    clone.setColumns(this.columns);
-    clone.setNumPartitions(this.numPartitions);
-    clone.specifiers = new ArrayList<Specifier>(this.specifiers);
-
-    return clone;
-  }
-
-  @Override
-  public CatalogProtos.PartitionsProto getProto() {
-    if (builder == null) {
-      builder = CatalogProtos.PartitionsProto.newBuilder();
-    }
-    if (this.partitionsType != null) {
-      builder.setPartitionsType(this.partitionsType);
-    }
-    builder.clearColumns();
-    if (this.columns != null) {
-      for (Column col : columns) {
-        builder.addColumns(col.getProto());
-      }
-    }
-    builder.setNumPartitions(numPartitions);
-
-    if (this.specifiers != null) {
-      for(Specifier specifier: specifiers) {
-        builder.addSpecifiers(specifier.getProto());
-      }
-    }
-    return builder.build();
-  }
-
-  public String toString() {
-    Gson gson = new GsonBuilder().setPrettyPrinting().
-        excludeFieldsWithoutExposeAnnotation().create();
-    return gson.toJson(this);
-  }
-
-  @Override
-  public String toJson() {
-    return CatalogGsonHelper.toJson(this, Partitions.class);
-
-  }
-
-  public Column[] toArray() {
-    return this.columns.toArray(new Column[this.columns.size()]);
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index e5af491..ad6ef3e 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -113,7 +113,7 @@ message TableDescProto {
 	required TableProto meta = 3;
 	required SchemaProto schema = 4;
 	optional TableStatsProto stats = 5;
-	optional PartitionsProto partitions = 6;
+	optional PartitionDescProto partitions = 6;
 }
 
 enum FunctionType {
@@ -234,11 +234,12 @@ message SortSpecProto {
   optional bool nullFirst = 3 [default = false];
 }
 
-message PartitionsProto {
+message PartitionDescProto {
   required PartitionsType partitionsType = 1;
-  repeated ColumnProto columns = 2;
+  optional SchemaProto schema = 2;
   optional int32 numPartitions = 3;
   repeated SpecifierProto specifiers = 4;
+  optional bool isOmitValues = 5;
 }
 
 message SpecifierProto {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
index 1747e68..1422cf2 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestSchema.java
@@ -140,7 +140,33 @@ public class TestSchema {
     Schema schema2 = new Schema(schema.getProto());
     schema2.setQualifier("test1");
     Column column = schema2.getColumn(1);
+    assertEquals(1, schema2.getColumnIdByName("age"));
     assertEquals(column, schema2.getColumnByName("age"));
     assertEquals(column, schema2.getColumnByFQN("test1.age"));
+
+    Schema schema3 = new Schema();
+    schema3.addColumn("tb1.col1", Type.INT4);
+    schema3.addColumn("col2", Type.INT4);
+    assertEquals("tb1", schema3.getColumn(0).getQualifier());
+    assertEquals("tb1.col1", schema3.getColumn(0).getQualifiedName());
+    assertEquals("col1", schema3.getColumn(0).getColumnName());
+    assertEquals("col2", schema3.getColumn(1).getQualifiedName());
+
+    assertEquals(schema3.getColumn(0), schema3.getColumnByName("col1"));
+    assertEquals(schema3.getColumn(0), schema3.getColumnByFQN("tb1.col1"));
+    assertEquals(schema3.getColumn(1), schema3.getColumnByName("col2"));
+    assertEquals(schema3.getColumn(1), schema3.getColumnByFQN("col2"));
+
+    schema3.setQualifier("tb2");
+    assertEquals("tb2", schema3.getColumn(0).getQualifier());
+    assertEquals("tb2.col1", schema3.getColumn(0).getQualifiedName());
+    assertEquals("col1", schema3.getColumn(0).getColumnName());
+    assertEquals("tb2.col2", schema3.getColumn(1).getQualifiedName());
+
+    assertEquals(schema3.getColumn(0), schema3.getColumnByName("col1"));
+    assertEquals(schema3.getColumn(0), schema3.getColumnByFQN("tb2.col1"));
+    assertEquals(schema3.getColumn(1), schema3.getColumnByName("col2"));
+    assertEquals(schema3.getColumn(1), schema3.getColumnByFQN("tb2.col2"));
+
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
index e7a862a..74c7cb5 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
@@ -30,7 +30,7 @@ import org.apache.hcatalog.data.Pair;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
@@ -106,7 +106,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
     org.apache.tajo.catalog.Schema schema = null;
     Options options = null;
     TableStats stats = null;
-    Partitions partitions = null;
+    PartitionDesc partitions = null;
 
     // get db name and table name.
     try {
@@ -178,7 +178,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
       // set partition keys
       if (table.getPartitionKeys() != null) {
         if (table.getPartitionKeys().size() > 0) {
-          partitions = new Partitions();
+          partitions = new PartitionDesc();
           List<FieldSchema> partitionKeys = table.getPartitionKeys();
           for(int i = 0; i < partitionKeys.size(); i++) {
             FieldSchema fieldSchema = partitionKeys.get(i);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
index d8ee85e..9ab208f 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
@@ -265,7 +265,7 @@ public class TestHCatalogStore {
     assertEquals("type", columns.get(4).getColumnName());
     assertEquals(TajoDataTypes.Type.TEXT, columns.get(4).getDataType().getType());
     assertNotNull(table.getPartitions());
-    assertEquals("type", table.getPartitions().getColumn(0).getColumnName());
+    assertEquals("type", table.getPartitions().getSchema().getColumn(0).getColumnName());
     assertEquals(CatalogProtos.PartitionsType.COLUMN, table.getPartitions().getPartitionsType());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 3414e83..60c88e9 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -26,7 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
@@ -284,8 +284,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       //Partition
       if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
         try {
-          Partitions partitions = table.getPartitions();
-          List<Column> columnList = partitions.getColumns();
+          PartitionDesc partitionDesc = table.getPartitions();
+          List<Column> columnList = partitionDesc.getColumns();
 
           // Find columns which used for a partitioned table.
           StringBuffer columns = new StringBuffer();
@@ -320,19 +320,19 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
           }
 
           // Find information for subpartitions
-          if (partitions.getSpecifiers() != null) {
+          if (partitionDesc.getSpecifiers() != null) {
             int count = 1;
-            if (partitions.getSpecifiers().size() == 0) {
+            if (partitionDesc.getSpecifiers().size() == 0) {
               pstmt.clearParameters();
               pstmt.setString(1, null);
               pstmt.setInt(2, tid);
-              pstmt.setString(3, partitions.getPartitionsType().name());
-              pstmt.setInt(4, partitions.getNumPartitions());
+              pstmt.setString(3, partitionDesc.getPartitionsType().name());
+              pstmt.setInt(4, partitionDesc.getNumPartitions());
               pstmt.setString(5, columns.toString());
               pstmt.setString(6, null);
               pstmt.addBatch();
             } else {
-              for(Specifier specifier: partitions.getSpecifiers()) {
+              for(Specifier specifier: partitionDesc.getSpecifiers()) {
                 pstmt.clearParameters();
                 if (specifier.getName() != null && !specifier.getName().equals("")) {
                   pstmt.setString(1, specifier.getName());
@@ -340,8 +340,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
                   pstmt.setString(1, null);
                 }
                 pstmt.setInt(2, tid);
-                pstmt.setString(3, partitions.getPartitionsType().name());
-                pstmt.setInt(4, partitions.getNumPartitions());
+                pstmt.setString(3, partitionDesc.getPartitionsType().name());
+                pstmt.setInt(4, partitionDesc.getNumPartitions());
                 pstmt.setString(5, columns.toString());
                 pstmt.setString(6, specifier.getExpressions());
                 pstmt.addBatch();
@@ -352,8 +352,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
             pstmt.clearParameters();
             pstmt.setString(1, null);
             pstmt.setInt(2, tid);
-            pstmt.setString(3, partitions.getPartitionsType().name());
-            pstmt.setInt(4, partitions.getNumPartitions());
+            pstmt.setString(3, partitionDesc.getPartitionsType().name());
+            pstmt.setInt(4, partitionDesc.getNumPartitions());
             pstmt.setString(5, columns.toString());
             pstmt.setString(6, null);
             pstmt.addBatch();
@@ -496,7 +496,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
     StoreType storeType = null;
     Options options;
     TableStats stat = null;
-    Partitions partitions = null;
+    PartitionDesc partitionDesc = null;
     int tid = 0;
 
     try {
@@ -602,18 +602,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       res = stmt.executeQuery(sql);
 
       while (res.next()) {
-        if (partitions == null) {
-          partitions = new Partitions();
+        if (partitionDesc == null) {
+          partitionDesc = new PartitionDesc();
           String[] columns = res.getString("columns").split(",");
           for(String eachColumn: columns) {
-            partitions.addColumn(getColumn(tableName, tid, eachColumn));
+            partitionDesc.addColumn(getColumn(tableName, tid, eachColumn));
           }
-          partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString("type")));
-          partitions.setNumPartitions(res.getInt("quantity"));
+          partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString("type")));
+          partitionDesc.setNumPartitions(res.getInt("quantity"));
         }
 
         Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
-        partitions.addSpecifier(specifier);
+        partitionDesc.addSpecifier(specifier);
       }
 
     } catch (SQLException se) {
@@ -628,8 +628,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
       table.setStats(stat);
     }
 
-    if (partitions != null) {
-      table.setPartitions(partitions);
+    if (partitionDesc != null) {
+      table.setPartitions(partitionDesc);
     }
 
     return table;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
index 06a701b..f21ab0e 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DerbyStore.java
@@ -24,7 +24,7 @@ package org.apache.tajo.catalog.store;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
@@ -375,8 +375,8 @@ public class DerbyStore extends AbstractDBStore {
       //Partition
       if (table.getPartitions() != null && !table.getPartitions().toString().isEmpty()) {
         try {
-          Partitions partitions = table.getPartitions();
-          List<Column> columnList = partitions.getColumns();
+          PartitionDesc partitionDesc = table.getPartitions();
+          List<Column> columnList = partitionDesc.getColumns();
 
           // Find columns which used for a partitioned table.
           StringBuffer columns = new StringBuffer();
@@ -411,19 +411,19 @@ public class DerbyStore extends AbstractDBStore {
           }
 
           // Find information for subpartitions
-          if (partitions.getSpecifiers() != null) {
+          if (partitionDesc.getSpecifiers() != null) {
             int count = 1;
-            if (partitions.getSpecifiers().size() == 0) {
+            if (partitionDesc.getSpecifiers().size() == 0) {
               pstmt.clearParameters();
               pstmt.setString(1, null);
               pstmt.setInt(2, tid);
-              pstmt.setString(3, partitions.getPartitionsType().name());
-              pstmt.setInt(4, partitions.getNumPartitions());
+              pstmt.setString(3, partitionDesc.getPartitionsType().name());
+              pstmt.setInt(4, partitionDesc.getNumPartitions());
               pstmt.setString(5, columns.toString());
               pstmt.setString(6, null);
               pstmt.addBatch();
             } else {
-              for(Specifier eachValue: partitions.getSpecifiers()) {
+              for(Specifier eachValue: partitionDesc.getSpecifiers()) {
                 pstmt.clearParameters();
                 if (eachValue.getName() != null && !eachValue.getName().equals("")) {
                   pstmt.setString(1, eachValue.getName());
@@ -431,8 +431,8 @@ public class DerbyStore extends AbstractDBStore {
                   pstmt.setString(1, null);
                 }
                 pstmt.setInt(2, tid);
-                pstmt.setString(3, partitions.getPartitionsType().name());
-                pstmt.setInt(4, partitions.getNumPartitions());
+                pstmt.setString(3, partitionDesc.getPartitionsType().name());
+                pstmt.setInt(4, partitionDesc.getNumPartitions());
                 pstmt.setString(5, columns.toString());
                 pstmt.setString(6, eachValue.getExpressions());
                 pstmt.addBatch();
@@ -443,8 +443,8 @@ public class DerbyStore extends AbstractDBStore {
             pstmt.clearParameters();
             pstmt.setString(1, null);
             pstmt.setInt(2, tid);
-            pstmt.setString(3, partitions.getPartitionsType().name());
-            pstmt.setInt(4, partitions.getNumPartitions());
+            pstmt.setString(3, partitionDesc.getPartitionsType().name());
+            pstmt.setInt(4, partitionDesc.getNumPartitions());
             pstmt.setString(5, columns.toString());
             pstmt.setString(6, null);
             pstmt.addBatch();
@@ -596,7 +596,7 @@ public class DerbyStore extends AbstractDBStore {
     StoreType storeType = null;
     Options options;
     TableStats stat = null;
-    Partitions partitions = null;
+    PartitionDesc partitionDesc = null;
     int tid = 0;
 
     try {
@@ -706,19 +706,19 @@ public class DerbyStore extends AbstractDBStore {
         res = stmt.executeQuery(sql);
 
         while (res.next()) {
-          if (partitions == null) {
-            partitions = new Partitions();
+          if (partitionDesc == null) {
+            partitionDesc = new PartitionDesc();
             String[] columns = res.getString("columns").split(",");
             for(String eachColumn: columns) {
-              partitions.addColumn(getColumn(tableName, tid, eachColumn));
+              partitionDesc.addColumn(getColumn(tableName, tid, eachColumn));
             }
-            partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString
+            partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(res.getString
                 ("type")));
-            partitions.setNumPartitions(res.getInt("quantity"));
+            partitionDesc.setNumPartitions(res.getInt("quantity"));
           }
 
           Specifier specifier = new Specifier(res.getString("name"), res.getString("expressions"));
-          partitions.addSpecifier(specifier);
+          partitionDesc.addSpecifier(specifier);
         }
 
       } catch (SQLException se) {
@@ -733,8 +733,8 @@ public class DerbyStore extends AbstractDBStore {
         table.setStats(stat);
       }
 
-      if (partitions != null) {
-        table.setPartitions(partitions);
+      if (partitionDesc != null) {
+        table.setPartitions(partitionDesc);
       }
 
       return table;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
index d174e72..3abbb81 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java
@@ -20,7 +20,7 @@ package org.apache.tajo.catalog;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.function.Function;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
@@ -214,13 +214,13 @@ public class TestCatalog {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    Partitions partitions = new Partitions();
-    partitions.addColumn(new Column("id", Type.INT4));
-    partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
-    partitions.setNumPartitions(2);
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.addColumn(new Column("id", Type.INT4));
+    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+    partitionDesc.setNumPartitions(2);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitions);
+    desc.setPartitions(partitionDesc);
 
     assertFalse(catalog.existsTable(tableName));
     catalog.addTable(desc);
@@ -229,7 +229,7 @@ public class TestCatalog {
 
     assertEquals(retrieved.getName(), tableName);
     assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
-    assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+    assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
     assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
 
     catalog.deleteTable(tableName);
@@ -250,17 +250,17 @@ public class TestCatalog {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    Partitions partitions = new Partitions();
-    partitions.addColumn(new Column("id", Type.INT4));
-    partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
-    partitions.setNumPartitions(2);
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.addColumn(new Column("id", Type.INT4));
+    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+    partitionDesc.setNumPartitions(2);
 
-    partitions.addSpecifier(new Specifier("sub_part1"));
-    partitions.addSpecifier(new Specifier("sub_part2"));
-    partitions.addSpecifier(new Specifier("sub_part3"));
+    partitionDesc.addSpecifier(new Specifier("sub_part1"));
+    partitionDesc.addSpecifier(new Specifier("sub_part2"));
+    partitionDesc.addSpecifier(new Specifier("sub_part3"));
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitions);
+    desc.setPartitions(partitionDesc);
     assertFalse(catalog.existsTable(tableName));
     catalog.addTable(desc);
     assertTrue(catalog.existsTable(tableName));
@@ -269,7 +269,7 @@ public class TestCatalog {
 
     assertEquals(retrieved.getName(), tableName);
     assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.HASH);
-    assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+    assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
     assertEquals(retrieved.getPartitions().getNumPartitions(), 2);
     assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
         "sub_part1");
@@ -295,15 +295,15 @@ public class TestCatalog {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    Partitions partitions = new Partitions();
-    partitions.addColumn(new Column("id", Type.INT4));
-    partitions.setPartitionsType(CatalogProtos.PartitionsType.LIST);
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.addColumn(new Column("id", Type.INT4));
+    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.LIST);
 
-    partitions.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
-    partitions.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+    partitionDesc.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
+    partitionDesc.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitions);
+    desc.setPartitions(partitionDesc);
     assertFalse(catalog.existsTable(tableName));
     catalog.addTable(desc);
     assertTrue(catalog.existsTable(tableName));
@@ -312,7 +312,7 @@ public class TestCatalog {
 
     assertEquals(retrieved.getName(), tableName);
     assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.LIST);
-    assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+    assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
     assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
         "sub_part1");
     assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
@@ -339,16 +339,16 @@ public class TestCatalog {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    Partitions partitions = new Partitions();
-    partitions.addColumn(new Column("id", Type.INT4));
-    partitions.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.addColumn(new Column("id", Type.INT4));
+    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
 
-    partitions.addSpecifier(new Specifier("sub_part1", "2"));
-    partitions.addSpecifier(new Specifier("sub_part2", "5"));
-    partitions.addSpecifier(new Specifier("sub_part3"));
+    partitionDesc.addSpecifier(new Specifier("sub_part1", "2"));
+    partitionDesc.addSpecifier(new Specifier("sub_part2", "5"));
+    partitionDesc.addSpecifier(new Specifier("sub_part3"));
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitions);
+    desc.setPartitions(partitionDesc);
     assertFalse(catalog.existsTable(tableName));
     catalog.addTable(desc);
     assertTrue(catalog.existsTable(tableName));
@@ -357,7 +357,7 @@ public class TestCatalog {
 
     assertEquals(retrieved.getName(), tableName);
     assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.RANGE);
-    assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+    assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
     assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getName(),
         "sub_part1");
     assertEquals(retrieved.getPartitions().getSpecifiers().get(0).getExpressions(),
@@ -388,12 +388,12 @@ public class TestCatalog {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    Partitions partitions = new Partitions();
-    partitions.addColumn(new Column("id", Type.INT4));
-    partitions.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.addColumn(new Column("id", Type.INT4));
+    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitions);
+    desc.setPartitions(partitionDesc);
     assertFalse(catalog.existsTable(tableName));
     catalog.addTable(desc);
     assertTrue(catalog.existsTable(tableName));
@@ -402,7 +402,7 @@ public class TestCatalog {
 
     assertEquals(retrieved.getName(), tableName);
     assertEquals(retrieved.getPartitions().getPartitionsType(), CatalogProtos.PartitionsType.COLUMN);
-    assertEquals(retrieved.getPartitions().getColumn(0).getColumnName(), "id");
+    assertEquals(retrieved.getPartitions().getSchema().getColumn(0).getColumnName(), "id");
 
     catalog.deleteTable(tableName);
     assertFalse(catalog.existsTable(tableName));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
index d3671b3..1949afc 100644
--- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestDBStore.java
@@ -22,7 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -236,13 +236,13 @@ public class TestDBStore {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    Partitions partitions = new Partitions();
-    partitions.addColumn(new Column("id", Type.INT4));
-    partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
-    partitions.setNumPartitions(2);
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.addColumn(new Column("id", Type.INT4));
+    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+    partitionDesc.setNumPartitions(2);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitions);
+    desc.setPartitions(partitionDesc);
     assertFalse(store.existTable(tableName));
     store.addTable(desc);
     assertTrue(store.existTable(tableName));
@@ -268,17 +268,17 @@ public class TestDBStore {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    Partitions partitions = new Partitions();
-    partitions.addColumn(new Column("id", Type.INT4));
-    partitions.setPartitionsType(CatalogProtos.PartitionsType.HASH);
-    partitions.setNumPartitions(2);
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.addColumn(new Column("id", Type.INT4));
+    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.HASH);
+    partitionDesc.setNumPartitions(2);
 
-    partitions.addSpecifier(new Specifier("sub_part1"));
-    partitions.addSpecifier(new Specifier("sub_part2"));
-    partitions.addSpecifier(new Specifier("sub_part3"));
+    partitionDesc.addSpecifier(new Specifier("sub_part1"));
+    partitionDesc.addSpecifier(new Specifier("sub_part2"));
+    partitionDesc.addSpecifier(new Specifier("sub_part3"));
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitions);
+    desc.setPartitions(partitionDesc);
     assertFalse(store.existTable(tableName));
     store.addTable(desc);
     assertTrue(store.existTable(tableName));
@@ -304,15 +304,15 @@ public class TestDBStore {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    Partitions partitions = new Partitions();
-    partitions.addColumn(new Column("id", Type.INT4));
-    partitions.setPartitionsType(CatalogProtos.PartitionsType.LIST);
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.addColumn(new Column("id", Type.INT4));
+    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.LIST);
 
-    partitions.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
-    partitions.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
+    partitionDesc.addSpecifier(new Specifier("sub_part1", "Seoul,서울"));
+    partitionDesc.addSpecifier(new Specifier("sub_part2", "Busan,부산"));
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitions);
+    desc.setPartitions(partitionDesc);
     assertFalse(store.existTable(tableName));
     store.addTable(desc);
     assertTrue(store.existTable(tableName));
@@ -338,16 +338,16 @@ public class TestDBStore {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    Partitions partitions = new Partitions();
-    partitions.addColumn(new Column("id", Type.INT4));
-    partitions.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.addColumn(new Column("id", Type.INT4));
+    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.RANGE);
 
-    partitions.addSpecifier(new Specifier("sub_part1", "2"));
-    partitions.addSpecifier(new Specifier("sub_part2", "5"));
-    partitions.addSpecifier(new Specifier("sub_part3"));
+    partitionDesc.addSpecifier(new Specifier("sub_part1", "2"));
+    partitionDesc.addSpecifier(new Specifier("sub_part2", "5"));
+    partitionDesc.addSpecifier(new Specifier("sub_part3"));
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitions);
+    desc.setPartitions(partitionDesc);
     assertFalse(store.existTable(tableName));
     store.addTable(desc);
     assertTrue(store.existTable(tableName));
@@ -373,12 +373,12 @@ public class TestDBStore {
     opts.put("file.delimiter", ",");
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, opts);
 
-    Partitions partitions = new Partitions();
-    partitions.addColumn(new Column("id", Type.INT4));
-    partitions.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
+    PartitionDesc partitionDesc = new PartitionDesc();
+    partitionDesc.addColumn(new Column("id", Type.INT4));
+    partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.COLUMN);
 
     TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable"));
-    desc.setPartitions(partitions);
+    desc.setPartitions(partitionDesc);
     assertFalse(store.existTable(tableName));
     store.addTable(desc);
     assertTrue(store.existTable(tableName));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
index bf4d83d..56fd3b5 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/BooleanDatum.java
@@ -24,6 +24,8 @@ import org.apache.tajo.datum.exception.InvalidOperationException;
 
 public class BooleanDatum extends Datum {
 	@Expose private boolean val;
+  public static final String TRUE="t";
+  public static final String FALSE="f";
 
   public BooleanDatum() {
     super(TajoDataTypes.Type.BOOLEAN);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
index 343bbfe..2082225 100644
--- a/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
+++ b/tajo-common/src/main/java/org/apache/tajo/datum/DatumFactory.java
@@ -69,6 +69,40 @@ public class DatumFactory {
     }
   }
 
+  public static Datum createFromString(DataType dataType, String value) {
+    switch (dataType.getType()) {
+
+    case BOOLEAN:
+      return createBool(value.equals(BooleanDatum.TRUE));
+    case INT2:
+      return createInt2(value);
+    case INT4:
+      return createInt4(value);
+    case INT8:
+      return createInt8(value);
+    case FLOAT4:
+      return createFloat4(value);
+    case FLOAT8:
+      return createFloat8(value);
+    case CHAR:
+      return createChar(value);
+    case TEXT:
+      return createText(value);
+    case DATE:
+      return createDate(value);
+    case TIME:
+      return createTime(value);
+    case TIMESTAMP:
+      return createTimeStamp(value);
+    case BLOB:
+      return createBlob(value);
+    case INET4:
+      return createInet4(value);
+    default:
+      throw new UnsupportedOperationException(dataType.toString());
+    }
+  }
+
   public static Datum createFromBytes(DataType dataType, byte[] bytes) {
     switch (dataType.getType()) {
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
index 5b5a406..2f81ef4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TUtil.java
@@ -183,7 +183,7 @@ public class TUtil {
   }
 
   public static String arrayToString(Object [] objects) {
-    boolean first = false;
+    boolean first = true;
     StringBuilder sb = new StringBuilder();
     for(Object object : objects) {
       if (first) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4 b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
index a19f3e9..3bc9ff4 100644
--- a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
+++ b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLLexer.g4
@@ -290,7 +290,6 @@ Nonreserved_keywords
 
   | MAXVALUE
 
-  | VALUES
   | PARTITION
   | PARTITIONS
   | ROLLUP
@@ -305,6 +304,8 @@ Nonreserved_keywords
   | THAN
   | TRIM
   | TO
+
+  | VALUES
   ;
 
 /*

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
index 0728008..54e43c7 100644
--- a/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
+++ b/tajo-core/tajo-core-backend/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4
@@ -152,7 +152,7 @@ list_value_partition
   ;
 
 column_partitions
-  : PARTITION BY COLUMN LEFT_PAREN column_reference_list RIGHT_PAREN
+  : PARTITION BY COLUMN table_elements
   ;
 
 partition_name

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
index a5ee425..e933ddb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
@@ -20,6 +20,8 @@ package org.apache.tajo.engine.eval;
 
 import org.apache.tajo.catalog.Column;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 public class AlgebraicUtil {
@@ -51,7 +53,7 @@ public class AlgebraicUtil {
   }
   
   private static EvalNode _transpose(EvalNode _expr, Column target) {
-     EvalNode expr = simplify(_expr);
+     EvalNode expr = eliminateConstantExprs(_expr);
      
      if (isSingleVar(expr.getLeftExpr())) {
        return expr;
@@ -128,7 +130,7 @@ public class AlgebraicUtil {
    * @param expr to be simplified
    * @return the simplified expr
    */
-  public static EvalNode simplify(EvalNode expr) {
+  public static EvalNode eliminateConstantExprs(EvalNode expr) {
     EvalNode left = expr.getLeftExpr();
     EvalNode right = expr.getRightExpr();
     
@@ -136,31 +138,27 @@ public class AlgebraicUtil {
     case AND:
     case OR:
     case EQUAL:
+    case NOT_EQUAL:
     case LTH:
     case LEQ:
     case GTH:
     case GEQ:
-      left = simplify(left);
-      right = simplify(right);      
-      return new BinaryEval(expr.getType(), left, right);    
-    
     case PLUS:
     case MINUS:
     case MULTIPLY:
     case DIVIDE:
-      left = simplify(left);
-      right = simplify(right);
-      
-      // If both are constants, they can be evaluated immediately.
-      if (left.getType() == EvalType.CONST
-          && right.getType() == EvalType.CONST) {
+    case MODULAR:
+      left = eliminateConstantExprs(left);
+      right = eliminateConstantExprs(right);
+
+      if (left.getType() == EvalType.CONST && right.getType() == EvalType.CONST) {
         EvalContext exprCtx = expr.newContext();
         expr.eval(exprCtx, null, null);
         return new ConstEval(expr.terminate(exprCtx));
       } else {
-        return new BinaryEval(expr.getType(), left, right);            
+        return new BinaryEval(expr.getType(), left, right);
       }
-      
+
     case CONST:
       return expr;
       
@@ -286,4 +284,112 @@ public class AlgebraicUtil {
     
     return expr;
   }
+
+  public static boolean isComparisonOperator(EvalNode expr) {
+    return expr.getType() == EvalType.EQUAL ||
+        expr.getType() == EvalType.LEQ ||
+        expr.getType() == EvalType.LTH ||
+        expr.getType() == EvalType.GEQ ||
+        expr.getType() == EvalType.GTH ||
+        expr.getType() == EvalType.BETWEEN;
+  }
+
+  public static boolean isIndexableOperator(EvalNode expr) {
+    return expr.getType() == EvalType.EQUAL ||
+        expr.getType() == EvalType.LEQ ||
+        expr.getType() == EvalType.LTH ||
+        expr.getType() == EvalType.GEQ ||
+        expr.getType() == EvalType.GTH ||
+        expr.getType() == EvalType.BETWEEN ||
+        expr.getType() == EvalType.IN ||
+        (expr.getType() == EvalType.LIKE && !((LikePredicateEval)expr).isLeadingWildCard());
+  }
+
+  /**
+   * Convert a list of conjunctive normal forms into a singleton expression.
+   *
+   * @param cnfExprs
+   * @return The EvalNode object that merges all CNF-formed expressions.
+   */
+  public static EvalNode createSingletonExprFromCNF(EvalNode... cnfExprs) {
+    if (cnfExprs.length == 1) {
+      return cnfExprs[0];
+    }
+
+    return createSingletonExprFromCNFRecursive(cnfExprs, 0);
+  }
+
+  private static EvalNode createSingletonExprFromCNFRecursive(EvalNode[] evalNode, int idx) {
+    if (idx == evalNode.length - 2) {
+      return new BinaryEval(EvalType.AND, evalNode[idx], evalNode[idx + 1]);
+    } else {
+      return new BinaryEval(EvalType.AND, evalNode[idx], createSingletonExprFromCNFRecursive(evalNode, idx + 1));
+    }
+  }
+
+  /**
+   * Transforms a expression to an array of conjunctive normal formed expressions.
+   *
+   * @param expr The expression to be transformed to an array of CNF-formed expressions.
+   * @return An array of CNF-formed expressions
+   */
+  public static EvalNode [] toConjunctiveNormalFormArray(EvalNode expr) {
+    List<EvalNode> list = new ArrayList<EvalNode>();
+    toConjunctiveNormalFormArrayRecursive(expr, list);
+    return list.toArray(new EvalNode[list.size()]);
+  }
+
+  private static void toConjunctiveNormalFormArrayRecursive(EvalNode node, List<EvalNode> found) {
+    if (node.getType() == EvalType.AND) {
+      toConjunctiveNormalFormArrayRecursive(node.getLeftExpr(), found);
+      toConjunctiveNormalFormArrayRecursive(node.getRightExpr(), found);
+    } else {
+      found.add(node);
+    }
+  }
+
+  /**
+   * Convert a list of conjunctive normal forms into a singleton expression.
+   *
+   * @param cnfExprs
+   * @return The EvalNode object that merges all CNF-formed expressions.
+   */
+  public static EvalNode createSingletonExprFromDNF(EvalNode... cnfExprs) {
+    if (cnfExprs.length == 1) {
+      return cnfExprs[0];
+    }
+
+    return createSingletonExprFromDNFRecursive(cnfExprs, 0);
+  }
+
+  private static EvalNode createSingletonExprFromDNFRecursive(EvalNode[] evalNode, int idx) {
+    if (idx == evalNode.length - 2) {
+      return new BinaryEval(EvalType.OR, evalNode[idx], evalNode[idx + 1]);
+    } else {
+      return new BinaryEval(EvalType.OR, evalNode[idx], createSingletonExprFromDNFRecursive(evalNode, idx + 1));
+    }
+  }
+
+  /**
+   * Transforms a expression to an array of disjunctive normal formed expressions.
+   *
+   * @param exprs The expressions to be transformed to an array of CNF-formed expressions.
+   * @return An array of CNF-formed expressions
+   */
+  public static EvalNode [] toDisjunctiveNormalFormArray(EvalNode...exprs) {
+    List<EvalNode> list = new ArrayList<EvalNode>();
+    for (EvalNode expr : exprs) {
+      toDisjunctiveNormalFormArrayRecursive(expr, list);
+    }
+    return list.toArray(new EvalNode[list.size()]);
+  }
+
+  private static void toDisjunctiveNormalFormArrayRecursive(EvalNode node, List<EvalNode> found) {
+    if (node.getType() == EvalType.OR) {
+      toDisjunctiveNormalFormArrayRecursive(node.getLeftExpr(), found);
+      toDisjunctiveNormalFormArrayRecursive(node.getRightExpr(), found);
+    } else {
+      found.add(node);
+    }
+  }
 }