You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2013/12/28 07:36:19 UTC

[22/50] [abbrv] TAJO-338 - Add Query Optimization Part for Column-Partitioned Tables. (hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index 638c92a..d262bc3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.engine.eval;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -31,8 +30,8 @@ import org.apache.tajo.exception.InternalException;
 import java.util.*;
 
 public class EvalTreeUtil {
-  public static void changeColumnRef(EvalNode node, String oldName,
-      String newName) {
+
+  public static void changeColumnRef(EvalNode node, String oldName, String newName) {
     node.postOrder(new ChangeColumnRefVisitor(oldName, newName));
   }
 
@@ -80,50 +79,9 @@ public class EvalTreeUtil {
     node.postOrder(finder);
     return finder.getColumnRefs();
   }
-  
-  /**
-   * Convert a list of conjunctive normal forms into a singleton expression.
-   *  
-   * @param cnfExprs
-   * @return The EvalNode object that merges all CNF-formed expressions.
-   */
-  public static EvalNode transformCNF2Singleton(EvalNode...cnfExprs) {
-    if (cnfExprs.length == 1) {
-      return cnfExprs[0];
-    }
-    
-    return transformCNF2Singleton_(cnfExprs, 0);
-  }
-  
-  private static EvalNode transformCNF2Singleton_(EvalNode [] evalNode, int idx) {
-    if (idx == evalNode.length - 2) {
-      return new BinaryEval(EvalType.AND, evalNode[idx], evalNode[idx + 1]);
-    } else {
-      return new BinaryEval(EvalType.AND, evalNode[idx],
-          transformCNF2Singleton_(evalNode, idx + 1));
-    }
-  }
-  
-  /**
-   * Get a list of exprs similar to CNF
-   * 
-   * @param expr The expression to be transformed to an array of CNF-formed expressions.
-   * @return An array of CNF-formed expressions
-   */
-  public static EvalNode [] getConjNormalForm(EvalNode expr) {
-    List<EvalNode> list = new ArrayList<EvalNode>();    
-    getConjNormalForm(expr, list);
-    return list.toArray(new EvalNode[list.size()]);
-  }
-  
-  private static void getConjNormalForm(EvalNode node, List<EvalNode> found) {
-    if (node.getType() == EvalType.AND) {
-      getConjNormalForm(node.getLeftExpr(), found);
-      getConjNormalForm(node.getRightExpr(), found);
-    } else {
-      found.add(node);
-    }
-  }
+
+
+
   
   public static Schema getSchemaByTargets(Schema inputSchema, Target [] targets) 
       throws InternalException {
@@ -234,16 +192,8 @@ public class EvalTreeUtil {
     }    
   }
 
-  public static boolean isComparisonOperator(EvalNode expr) {
-    return expr.getType() == EvalType.EQUAL ||
-        expr.getType() == EvalType.LEQ ||
-        expr.getType() == EvalType.LTH ||
-        expr.getType() == EvalType.GEQ ||
-        expr.getType() == EvalType.GTH;
-  }
-
   public static boolean isJoinQual(EvalNode expr) {
-    return isComparisonOperator(expr) &&
+    return AlgebraicUtil.isComparisonOperator(expr) &&
         expr.getLeftExpr().getType() == EvalType.FIELD &&
         expr.getRightExpr().getType() == EvalType.FIELD;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
index e2411e3..59e8b31 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
@@ -71,7 +71,7 @@ public class InEval extends BinaryEval {
 
     Datum value = tuple.get(fieldId);
     for (Datum datum : values) {
-      if (value.equals(datum)) {
+      if (value.equalsTo(datum).asBool()) {
         isIncluded = true;
         break;
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
index ac7aeeb..e1c693e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/LikePredicateEval.java
@@ -42,6 +42,10 @@ public class LikePredicateEval extends PatternMatchPredicateEval {
     this.compiled = Pattern.compile(regex, flags);
   }
 
+  public boolean isLeadingWildCard() {
+    return pattern.indexOf(".*") == 0;
+  }
+
   @Override
   public String toString() {
     return leftExpr.toString() + (caseInsensitive ? "ILIKE" : "LIKE") + "'" + pattern +"'";

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
index 2cd91f9..ad8acf1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java
@@ -906,9 +906,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     }
 
     if (checkIfExist(ctx.table_partitioning_clauses())) {
-      CreateTable.PartitionOption partitionOption =
+      PartitionDescExpr partitionDesc =
           parseTablePartitioningClause(ctx.table_partitioning_clauses());
-      createTable.setPartition(partitionOption);
+      createTable.setPartition(partitionDesc);
     }
     return createTable;
   }
@@ -925,7 +925,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     return elements;
   }
 
-  public CreateTable.PartitionOption parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
+  public PartitionDescExpr parseTablePartitioningClause(SQLParser.Table_partitioning_clausesContext ctx) {
 
     if (checkIfExist(ctx.range_partitions())) { // For Range Partition
       Range_partitionsContext rangePartitionsContext = ctx.range_partitions();
@@ -978,9 +978,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
       return new ListPartition(getColumnReferences(ctx.list_partitions().column_reference_list()), specifiers);
 
     } else if (checkIfExist(ctx.column_partitions())) { // For Column Partition (Hive Style)
-      return new CreateTable.ColumnPartition(getColumnReferences(ctx.column_partitions().column_reference_list()));
+      return new CreateTable.ColumnPartition(getDefinitions(ctx.column_partitions().table_elements()), true);
     } else {
-      throw new SQLSyntaxError("Wrong partition option");
+      throw new SQLSyntaxError("Invalid Partition Type: " + ctx.toStringTree());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
index 9478648..0cee8dd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/BasicLogicalPlanVisitor.java
@@ -92,6 +92,9 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
       case SCAN:
         current = visitScan(context, plan, (ScanNode) node, stack);
         break;
+      case PARTITIONS_SCAN:
+        current = visitScan(context, plan, (ScanNode) node, stack);
+        break;
       case STORE:
         current = visitStoreTable(context, plan, (StoreTableNode) node, stack);
         break;
@@ -221,6 +224,11 @@ public class BasicLogicalPlanVisitor<CONTEXT, RESULT> implements LogicalPlanVisi
   }
 
   @Override
+  public RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, PartitionedTableScanNode node, Stack<LogicalNode> stack) throws PlanningException {
+    return null;
+  }
+
+  @Override
   public RESULT visitStoreTable(CONTEXT context, LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack)
       throws PlanningException {
     stack.push(node);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
index 640383e..5f11f1a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java
@@ -22,8 +22,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.tajo.algebra.JoinType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
 import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.graph.DirectedGraphCursor;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.logical.join.FoundJoinOrder;
@@ -32,6 +33,7 @@ import org.apache.tajo.engine.planner.logical.join.JoinGraph;
 import org.apache.tajo.engine.planner.logical.join.JoinOrderAlgorithm;
 import org.apache.tajo.engine.planner.rewrite.BasicQueryRewriteEngine;
 import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule;
+import org.apache.tajo.engine.planner.rewrite.PartitionedTableRewriter;
 import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 
 import java.util.Set;
@@ -49,12 +51,13 @@ public class LogicalOptimizer {
   private BasicQueryRewriteEngine rulesAfterToJoinOpt;
   private JoinOrderAlgorithm joinOrderAlgorithm = new GreedyHeuristicJoinOrderAlgorithm();
 
-  public LogicalOptimizer() {
+  public LogicalOptimizer(TajoConf systemConf) {
     rulesBeforeJoinOpt = new BasicQueryRewriteEngine();
     rulesBeforeJoinOpt.addRewriteRule(new FilterPushDownRule());
 
     rulesAfterToJoinOpt = new BasicQueryRewriteEngine();
     rulesAfterToJoinOpt.addRewriteRule(new ProjectionPushDownRule());
+    rulesAfterToJoinOpt.addRewriteRule(new PartitionedTableRewriter(systemConf));
   }
 
   public LogicalNode optimize(LogicalPlan plan) throws PlanningException {
@@ -130,7 +133,7 @@ public class LogicalOptimizer {
     public LogicalNode visitFilter(JoinGraphContext context, LogicalPlan plan, SelectionNode node,
                                    Stack<LogicalNode> stack) throws PlanningException {
       super.visitFilter(context, plan, node, stack);
-      context.quals.addAll(Lists.newArrayList(EvalTreeUtil.getConjNormalForm(node.getQual())));
+      context.quals.addAll(Lists.newArrayList(AlgebraicUtil.toConjunctiveNormalFormArray(node.getQual())));
       return node;
     }
 
@@ -239,7 +242,7 @@ public class LogicalOptimizer {
 
       double filterFactor = 1;
       if (joinNode.hasJoinQual()) {
-        EvalNode [] quals = EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual());
+        EvalNode [] quals = AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual());
         filterFactor = Math.pow(GreedyHeuristicJoinOrderAlgorithm.DEFAULT_SELECTION_FACTOR, quals.length);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
index c06b7a7..c6fcefc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVisitor.java
@@ -47,6 +47,8 @@ public interface LogicalPlanVisitor <CONTEXT, RESULT> {
       throws PlanningException;
   RESULT visitScan(CONTEXT context, LogicalPlan plan, ScanNode node, Stack<LogicalNode> stack)
       throws PlanningException;
+  RESULT visitPartitionedTableScan(CONTEXT context, LogicalPlan plan, PartitionedTableScanNode node,
+                                   Stack<LogicalNode> stack) throws PlanningException;
   RESULT visitStoreTable(CONTEXT context, LogicalPlan plan, StoreTableNode node, Stack<LogicalNode> stack)
       throws PlanningException;
   RESULT visitInsert(CONTEXT context, LogicalPlan plan, InsertNode node, Stack<LogicalNode> stack)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 63b7985..605b9df 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.algebra.CreateTable.ColumnDefinition;
 import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.Specifier;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
@@ -50,10 +50,13 @@ import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.exception.InternalException;
 import org.apache.tajo.util.TUtil;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Stack;
 
 import static org.apache.tajo.algebra.Aggregation.GroupType;
+import static org.apache.tajo.algebra.CreateTable.ColumnPartition;
+import static org.apache.tajo.algebra.CreateTable.PartitionType;
 import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import static org.apache.tajo.engine.planner.LogicalPlan.BlockType;
 
@@ -407,7 +410,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     // 3. build this plan:
     EvalNode searchCondition = createEvalTree(plan, block, selection.getQual());
-    SelectionNode selectionNode = new SelectionNode(plan.newPID(), searchCondition);
+    EvalNode simplified = AlgebraicUtil.eliminateConstantExprs(searchCondition);
+    SelectionNode selectionNode = new SelectionNode(plan.newPID(), simplified);
 
     // 4. set child plan, update input/output schemas:
     selectionNode.setChild(child);
@@ -772,8 +776,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
       return storeNode;
     } else {
-      CreateTableNode createTableNode = new CreateTableNode(context.plan.newPID(), expr.getTableName(),
-          convertTableElementsSchema(expr.getTableElements()));
+
+      Schema tableSchema;
+      if (expr.hasPartition() && expr.getPartition().getPartitionType() == PartitionType.COLUMN &&
+          ((ColumnPartition)expr.getPartition()).isOmitValues()) {
+        ColumnDefinition [] merged = TUtil.concat(expr.getTableElements(),
+            ((ColumnPartition)expr.getPartition()).getColumns());
+        tableSchema = convertTableElementsSchema(merged);
+      } else {
+        tableSchema = convertTableElementsSchema(expr.getTableElements());
+      }
+
+      CreateTableNode createTableNode = new CreateTableNode(
+          context.plan.newPID(),
+          expr.getTableName(),
+          tableSchema);
 
       if (expr.isExternal()) {
         createTableNode.setExternal(true);
@@ -811,28 +828,28 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
    * @return
    * @throws PlanningException
    */
-  private Partitions convertTableElementsPartition(PlanContext context,
+  private PartitionDesc convertTableElementsPartition(PlanContext context,
                                                    CreateTable expr) throws PlanningException {
     Schema schema = convertTableElementsSchema(expr.getTableElements());
-    Partitions partitions = null;
+    PartitionDesc partitionDesc = null;
     List<Specifier> specifiers = null;
     if (expr.hasPartition()) {
-      partitions = new Partitions();
+      partitionDesc = new PartitionDesc();
       specifiers = TUtil.newList();
 
-      partitions.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
+      partitionDesc.setPartitionsType(CatalogProtos.PartitionsType.valueOf(expr.getPartition()
           .getPartitionType().name()));
 
-      if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.HASH)) {
+      if (expr.getPartition().getPartitionType().equals(PartitionType.HASH)) {
         CreateTable.HashPartition hashPartition = expr.getPartition();
 
-        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+        partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
             , hashPartition.getColumns()));
 
         if (hashPartition.getColumns() != null) {
           if (hashPartition.getQuantifier() != null) {
             String quantity = ((LiteralValue)hashPartition.getQuantifier()).getValue();
-            partitions.setNumPartitions(Integer.parseInt(quantity));
+            partitionDesc.setNumPartitions(Integer.parseInt(quantity));
           }
 
           if (hashPartition.getSpecifiers() != null) {
@@ -841,21 +858,21 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
             }
           }
 
-          if (specifiers.isEmpty() && partitions.getNumPartitions() > 0) {
-            for (int i = 0; i < partitions.getNumPartitions(); i++) {
-              String partitionName = partitions.getPartitionsType().name() + "_" + expr
+          if (specifiers.isEmpty() && partitionDesc.getNumPartitions() > 0) {
+            for (int i = 0; i < partitionDesc.getNumPartitions(); i++) {
+              String partitionName = partitionDesc.getPartitionsType().name() + "_" + expr
                   .getTableName() + "_" + i;
               specifiers.add(new Specifier(partitionName));
             }
           }
 
           if (!specifiers.isEmpty())
-            partitions.setSpecifiers(specifiers);
+            partitionDesc.setSpecifiers(specifiers);
         }
-      } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.LIST)) {
+      } else if (expr.getPartition().getPartitionType().equals(PartitionType.LIST)) {
         CreateTable.ListPartition listPartition = expr.getPartition();
 
-        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+        partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
             , listPartition.getColumns()));
 
         if (listPartition.getSpecifiers() != null) {
@@ -876,12 +893,12 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
             specifiers.add(specifier);
           }
           if (!specifiers.isEmpty())
-            partitions.setSpecifiers(specifiers);
+            partitionDesc.setSpecifiers(specifiers);
         }
-      } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.RANGE)) {
+      } else if (expr.getPartition().getPartitionType().equals(PartitionType.RANGE)) {
         CreateTable.RangePartition rangePartition = expr.getPartition();
 
-        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
+        partitionDesc.setColumns(convertTableElementsColumns(expr.getTableElements()
             , rangePartition.getColumns()));
 
         if (rangePartition.getSpecifiers() != null) {
@@ -903,17 +920,16 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
             specifiers.add(specifier);
           }
           if (!specifiers.isEmpty())
-            partitions.setSpecifiers(specifiers);
+            partitionDesc.setSpecifiers(specifiers);
         }
-      } else if (expr.getPartition().getPartitionType().equals(CreateTable.PartitionType.COLUMN)) {
-        CreateTable.ColumnPartition columnPartition = expr.getPartition();
-
-        partitions.setColumns(convertTableElementsColumns(expr.getTableElements()
-            , columnPartition.getColumns()));
+      } else if (expr.getPartition().getPartitionType() == PartitionType.COLUMN) {
+        ColumnPartition columnPartition = expr.getPartition();
+        partitionDesc.setColumns(convertTableElementsSchema(columnPartition.getColumns()).getColumns());
+        partitionDesc.setOmitValues(columnPartition.isOmitValues());
       }
     }
 
-    return partitions;
+    return partitionDesc;
   }
 
 
@@ -933,7 +949,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return schema;
   }
 
-  private List<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
+  private Collection<Column> convertTableElementsColumns(CreateTable.ColumnDefinition [] elements,
                                                    ColumnReferenceExpr[] references) {
     List<Column> columnList = TUtil.newList();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index bbecd50..73395a6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -138,7 +138,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         leftExec = createPlanRecursive(ctx, subQueryNode.getSubQuery());
         return leftExec;
 
-      } case SCAN:
+      }
+      case PARTITIONS_SCAN:
+      case SCAN:
         leftExec = createScanPlan(ctx, (ScanNode) logicalNode);
         return leftExec;
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 6b23ed8..d541680 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -545,7 +545,7 @@ public class PlannerUtil {
    * @return true if two operands refers to columns and the operator is comparison,
    */
   public static boolean isJoinQual(EvalNode qual) {
-    if (EvalTreeUtil.isComparisonOperator(qual)) {
+    if (AlgebraicUtil.isComparisonOperator(qual)) {
       List<Column> left = EvalTreeUtil.findAllColumnRefs(qual.getLeftExpr());
       List<Column> right = EvalTreeUtil.findAllColumnRefs(qual.getRightExpr());
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
index a49451c..ac50c46 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Target.java
@@ -48,6 +48,10 @@ public class Target implements Cloneable, GsonObject {
     return !hasAlias() ? column.getQualifiedName() : alias;
   }
 
+  public final void setExpr(EvalNode expr) {
+    this.expr = expr;
+  }
+
   public final void setAlias(String alias) {
     this.alias = alias;
     this.column = new Column(alias, expr.getValueType());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
index 4f936fc..4f3976e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java
@@ -53,6 +53,10 @@ public class ExecutionBlock {
     this.scanlist.clear();
     this.plan = plan;
 
+    if (plan == null) {
+      return;
+    }
+
     LogicalNode node = plan;
     ArrayList<LogicalNode> s = new ArrayList<LogicalNode>();
     s.add(node);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 6ef35ce..abf5620 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -112,7 +112,7 @@ public class GlobalPlanner {
   }
 
   private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNode,
-                                          ExecutionBlock leftBlock, ExecutionBlock rightBlock)
+                                       ExecutionBlock leftBlock, ExecutionBlock rightBlock)
       throws PlanningException {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
index 942309d..d0c8373 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/CreateTableNode.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Options;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
@@ -36,7 +36,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
   @Expose private Path path;
   @Expose private Options options;
   @Expose private boolean external;
-  @Expose private Partitions partitions;
+  @Expose private PartitionDesc partitionDesc;
 
   public CreateTableNode(int pid, String tableName, Schema schema) {
     super(pid, NodeType.CREATE_TABLE);
@@ -92,16 +92,16 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     this.external = external;
   }
 
-  public Partitions getPartitions() {
-    return partitions;
+  public PartitionDesc getPartitions() {
+    return partitionDesc;
   }
 
-  public void setPartitions(Partitions partitions) {
-    this.partitions = partitions;
+  public void setPartitions(PartitionDesc partitionDesc) {
+    this.partitionDesc = partitionDesc;
   }
 
   public boolean hasPartition() {
-    return this.partitions != null;
+    return this.partitionDesc != null;
   }
 
   @Override
@@ -121,7 +121,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
           && TUtil.checkEquals(path, other.path)
           && TUtil.checkEquals(options, other.options)
           && TUtil.checkEquals(partitionKeys, other.partitionKeys)
-          && TUtil.checkEquals(partitions, other.partitions);
+          && TUtil.checkEquals(partitionDesc, other.partitionDesc);
     } else {
       return false;
     }
@@ -137,7 +137,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     store.path = path != null ? new Path(path.toString()) : null;
     store.partitionKeys = partitionKeys != null ? partitionKeys.clone() : null;
     store.options = (Options) (options != null ? options.clone() : null);
-    store.partitions = (Partitions) (partitions != null ? partitions.clone() : null);
+    store.partitionDesc = (PartitionDesc) (partitionDesc != null ? partitionDesc.clone() : null);
     return store;
   }
   
@@ -157,7 +157,7 @@ public class CreateTableNode extends LogicalNode implements Cloneable {
     sb.append(",\"storeType\": \"" + this.storageType);
     sb.append(",\"path\" : \"" + this.path).append("\",");
     sb.append(",\"external\" : \"" + this.external).append("\",");
-    sb.append(",\"partitions\" : \"" + this.partitions).append("\",");
+    sb.append(",\"partitions\" : \"" + this.partitionDesc).append("\",");
     
     sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
     .append("\n  \"in schema\": ").append(getInSchema())

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
index 44790ec..eaaf0c7 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/NodeType.java
@@ -37,6 +37,7 @@ public enum NodeType {
   INTERSECT(IntersectNode.class),
   LIMIT(LimitNode.class),
   JOIN(JoinNode.class),
+  PARTITIONS_SCAN(PartitionedTableScanNode.class),
   PROJECTION(ProjectionNode.class),
   ROOT(LogicalRootNode.class),
   SCAN(ScanNode.class),

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
new file mode 100644
index 0000000..21f5ef4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/PartitionedTableScanNode.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.planner.PlanString;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.util.TUtil;
+
+public class PartitionedTableScanNode extends ScanNode {
+  @Expose Path [] inputPaths;
+
+  public PartitionedTableScanNode(int pid, ScanNode scanNode, Path[] inputPaths) {
+    super(pid, NodeType.PARTITIONS_SCAN, scanNode.getTableDesc());
+    this.setInSchema(scanNode.getInSchema());
+    this.setOutSchema(scanNode.getOutSchema());
+    this.alias = scanNode.alias;
+    this.renamedSchema = scanNode.renamedSchema;
+    this.qual = scanNode.qual;
+    this.targets = scanNode.targets;
+    this.inputPaths = inputPaths;
+  }
+
+  public void setInputPaths(Path [] paths) {
+    this.inputPaths = paths;
+  }
+
+  public Path [] getInputPaths() {
+    return inputPaths;
+  }
+	
+	public String toString() {
+	  StringBuilder sb = new StringBuilder();	  
+	  sb.append("\"Partitions Scan\" : {\"table\":\"")
+	  .append(getTableName()).append("\"");
+	  if (hasAlias()) {
+	    sb.append(",\"alias\": \"").append(alias);
+	  }
+	  
+	  if (hasQual()) {
+	    sb.append(", \"qual\": \"").append(this.qual).append("\"");
+	  }
+	  
+	  if (hasTargets()) {
+	    sb.append(", \"target list\": ");
+      boolean first = true;
+      for (Target target : targets) {
+        if (!first) {
+          sb.append(", ");
+        }
+        sb.append(target);
+        first = false;
+      }
+	  }
+
+    if (inputPaths != null) {
+      sb.append(", \"Partition paths\": ");
+      for (Path path : inputPaths) {
+        sb.append("\n ");
+        sb.append(path);
+      }
+      sb.append("\n");
+    }
+
+	  sb.append(",");
+	  sb.append("\n  \"out schema\": ").append(getOutSchema());
+	  sb.append("\n  \"in schema\": ").append(getInSchema());
+	  return sb.toString();
+	}
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(this.tableDesc, this.qual, this.targets);
+  }
+	
+	@Override
+	public boolean equals(Object obj) {
+	  if (obj instanceof PartitionedTableScanNode) {
+	    PartitionedTableScanNode other = (PartitionedTableScanNode) obj;
+	    
+	    boolean eq = super.equals(other); 
+	    eq = eq && TUtil.checkEquals(this.tableDesc, other.tableDesc);
+	    eq = eq && TUtil.checkEquals(this.qual, other.qual);
+	    eq = eq && TUtil.checkEquals(this.targets, other.targets);
+      eq = eq && TUtil.checkEquals(this.inputPaths, other.inputPaths);
+	    
+	    return eq;
+	  }	  
+	  
+	  return false;
+	}	
+	
+	@Override
+	public Object clone() throws CloneNotSupportedException {
+	  PartitionedTableScanNode unionScan = (PartitionedTableScanNode) super.clone();
+	  
+	  unionScan.tableDesc = (TableDesc) this.tableDesc.clone();
+	  
+	  if (hasQual()) {
+	    unionScan.qual = (EvalNode) this.qual.clone();
+	  }
+	  
+	  if (hasTargets()) {
+	    unionScan.targets = new Target[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        unionScan.targets[i] = (Target) targets[i].clone();
+      }
+	  }
+
+    unionScan.inputPaths = inputPaths;
+
+    return unionScan;
+	}
+	
+  @Override
+  public void preOrder(LogicalNodeVisitor visitor) {
+    visitor.visit(this);
+  }
+	
+	public void postOrder(LogicalNodeVisitor visitor) {        
+    visitor.visit(this);
+  }
+
+  @Override
+  public PlanString getPlanString() {
+    PlanString planStr = new PlanString("Scan on ").appendTitle(getTableName());
+    if (hasAlias()) {
+      planStr.appendTitle(" as ").appendTitle(alias);
+    }
+
+    if (hasQual()) {
+      planStr.addExplan("filter: ").appendExplain(this.qual.toString());
+    }
+
+    if (hasTargets()) {
+      planStr.addExplan("target list: ");
+      boolean first = true;
+      for (Target target : targets) {
+        if (!first) {
+          planStr.appendExplain(", ");
+        }
+        planStr.appendExplain(target.toString());
+        first = false;
+      }
+    }
+
+    if (inputPaths != null) {
+      planStr.addExplan("Path list: ");
+      int i = 0;
+      for (Path path : inputPaths) {
+        planStr.addExplan((i++) + ": ").appendExplain(path.toString());
+      }
+    }
+
+    planStr.addDetail("out schema: ").appendDetail(getOutSchema().toString());
+    planStr.addDetail("in schema: ").appendDetail(getInSchema().toString());
+
+    return planStr;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
index 6a30aa6..5ea61b4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/RelationNode.java
@@ -24,7 +24,7 @@ public abstract class RelationNode extends LogicalNode {
 
   public RelationNode(int pid, NodeType nodeType) {
     super(pid, nodeType);
-    assert(nodeType == NodeType.SCAN || nodeType == NodeType.TABLE_SUBQUERY);
+    assert(nodeType == NodeType.SCAN || nodeType == NodeType.PARTITIONS_SCAN || nodeType == NodeType.TABLE_SUBQUERY);
   }
 
   public abstract String getTableName();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 04c7b5a..cd9c1f1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -29,11 +29,16 @@ import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.util.TUtil;
 
 public class ScanNode extends RelationNode implements Projectable {
-	@Expose private TableDesc tableDesc;
-  @Expose private String alias;
-  @Expose private Schema renamedSchema;
-	@Expose private EvalNode qual;
-	@Expose private Target[] targets;
+	@Expose protected TableDesc tableDesc;
+  @Expose protected String alias;
+  @Expose protected Schema renamedSchema;
+	@Expose protected EvalNode qual;
+	@Expose protected Target[] targets;
+
+  protected ScanNode(int pid, NodeType nodeType, TableDesc desc) {
+    super(pid, nodeType);
+    this.tableDesc = desc;
+  }
 
   public ScanNode(int pid, TableDesc desc) {
     super(pid, NodeType.SCAN);
@@ -46,7 +51,7 @@ public class ScanNode extends RelationNode implements Projectable {
     this(pid, desc);
     this.alias = PlannerUtil.normalizeTableName(alias);
     renamedSchema = getOutSchema();
-    renamedSchema.setQualifier(this.alias, true);
+    renamedSchema.setQualifier(this.alias);
 	}
 	
 	public String getTableName() {
@@ -119,7 +124,7 @@ public class ScanNode extends RelationNode implements Projectable {
         first = false;
       }
 	  }
-	  
+
 	  sb.append(",");
 	  sb.append("\n  \"out schema\": ").append(getOutSchema());
 	  sb.append("\n  \"in schema\": ").append(getInSchema());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
index b2bd937..634fa3a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/StoreTableNode.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Options;
-import org.apache.tajo.catalog.partition.Partitions;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.util.TUtil;
 
@@ -39,17 +39,17 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
   @Expose private Options options;
   @Expose private boolean isCreatedTable = false;
   @Expose private boolean isOverwritten = false;
-  @Expose private Partitions partitions;
+  @Expose private PartitionDesc partitionDesc;
 
   public StoreTableNode(int pid, String tableName) {
     super(pid, NodeType.STORE);
     this.tableName = tableName;
   }
 
-  public StoreTableNode(int pid, String tableName, Partitions partitions) {
+  public StoreTableNode(int pid, String tableName, PartitionDesc partitionDesc) {
     super(pid, NodeType.STORE);
     this.tableName = tableName;
-    this.partitions = partitions;
+    this.partitionDesc = partitionDesc;
   }
 
   public final String getTableName() {
@@ -109,12 +109,12 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     return this.options;
   }
 
-  public Partitions getPartitions() {
-    return partitions;
+  public PartitionDesc getPartitions() {
+    return partitionDesc;
   }
 
-  public void setPartitions(Partitions partitions) {
-    this.partitions = partitions;
+  public void setPartitions(PartitionDesc partitionDesc) {
+    this.partitionDesc = partitionDesc;
   }
 
   @Override
@@ -146,7 +146,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
       eq = eq && TUtil.checkEquals(options, other.options);
       eq = eq && isCreatedTable == other.isCreatedTable;
       eq = eq && isOverwritten == other.isOverwritten;
-      eq = eq && TUtil.checkEquals(partitions, other.partitions);
+      eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
       return eq;
     } else {
       return false;
@@ -163,7 +163,7 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     store.options = options != null ? (Options) options.clone() : null;
     store.isCreatedTable = isCreatedTable;
     store.isOverwritten = isOverwritten;
-    store.partitions = partitions;
+    store.partitionDesc = partitionDesc;
     return store;
   }
 
@@ -188,8 +188,8 @@ public class StoreTableNode extends UnaryNode implements Cloneable {
     sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",")
     .append("\n  \"in schema\": ").append(getInSchema());
 
-    if(partitions != null) {
-      sb.append(partitions.toString());
+    if(partitionDesc != null) {
+      sb.append(partitionDesc.toString());
     }
 
     sb.append("}");

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
index d1f0986..335d12f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/TableSubQueryNode.java
@@ -35,7 +35,7 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
     this.tableName = PlannerUtil.normalizeTableName(tableName);
     this.subQuery = subQuery;
     setOutSchema((Schema) this.subQuery.getOutSchema().clone());
-    getOutSchema().setQualifier(this.tableName, true);
+    getOutSchema().setQualifier(this.tableName);
     setInSchema((Schema) this.subQuery.getInSchema().clone());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
index bd1b8d3..cbdad1a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/GreedyHeuristicJoinOrderAlgorithm.java
@@ -20,7 +20,7 @@ package org.apache.tajo.engine.planner.logical.join;
 
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.PlanningException;
@@ -90,7 +90,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
     joinNode.setInSchema(mergedSchema);
     joinNode.setOutSchema(mergedSchema);
     if (joinEdge.hasJoinQual()) {
-      joinNode.setJoinQual(EvalTreeUtil.transformCNF2Singleton(joinEdge.getJoinQual()));
+      joinNode.setJoinQual(AlgebraicUtil.createSingletonExprFromCNF(joinEdge.getJoinQual()));
     }
     return joinNode;
   }
@@ -206,7 +206,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
       double filterFactor = 1;
       if (joinNode.hasJoinQual()) {
         filterFactor = Math.pow(DEFAULT_SELECTION_FACTOR,
-            EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual()).length);
+            AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()).length);
         return getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()) * filterFactor;
       } else {
         return Math.pow(getCost(joinNode.getLeftChild()) * getCost(joinNode.getRightChild()), 2);
@@ -215,7 +215,7 @@ public class GreedyHeuristicJoinOrderAlgorithm implements JoinOrderAlgorithm {
     case SELECTION:
       SelectionNode selectionNode = (SelectionNode) node;
       return getCost(selectionNode.getChild()) *
-          Math.pow(DEFAULT_SELECTION_FACTOR, EvalTreeUtil.getConjNormalForm(selectionNode.getQual()).length);
+          Math.pow(DEFAULT_SELECTION_FACTOR, AlgebraicUtil.toConjunctiveNormalFormArray(selectionNode.getQual()).length);
 
     case TABLE_SUBQUERY:
       TableSubQueryNode subQueryNode = (TableSubQueryNode) node;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
index 66c82f3..74ef38a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/join/JoinGraph.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.logical.join;
 
 import com.google.common.collect.Sets;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AlgebraicUtil;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.LogicalPlan;
@@ -35,7 +36,7 @@ import java.util.Set;
 public class JoinGraph extends SimpleUndirectedGraph<String, JoinEdge> {
   public Collection<EvalNode> addJoin(LogicalPlan plan, LogicalPlan.QueryBlock block,
                                       JoinNode joinNode) throws PlanningException {
-    Set<EvalNode> cnf = Sets.newHashSet(EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual()));
+    Set<EvalNode> cnf = Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual()));
     Set<EvalNode> nonJoinQuals = Sets.newHashSet();
     for (EvalNode singleQual : cnf) {
       if (PlannerUtil.isJoinQual(singleQual)) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
index f7db4bd..db7e566 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ColumnPartitionedTableStoreExec.java
@@ -28,7 +28,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
@@ -45,6 +47,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
+
 /**
  * This class is a physical operator to store at column partitioned table.
  */
@@ -56,15 +60,11 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
   private Tuple tuple;
   private Path storeTablePath;
   private final Map<String, Appender> appenderMap = new HashMap<String, Appender>();
-  private int[] columnIndexes;
-  private String[] columnNames;
-
+  private int[] partitionColumnIndices;
+  private String[] partitionColumnNames;
 
-  /**
-   * @throws java.io.IOException
-   *
-   */
-  public ColumnPartitionedTableStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException {
+  public ColumnPartitionedTableStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child)
+      throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
 
@@ -75,18 +75,25 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
       meta = CatalogUtil.newTableMeta(plan.getStorageType());
     }
 
+    // Rewrite a output schema because we don't have to store field values
+    // corresponding to partition key columns.
+    if (plan.getPartitions() != null && plan.getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
+      rewriteColumnPartitionedTableSchema();
+    }
+
     // Find column index to name subpartition directory path
     if (this.plan.getPartitions() != null) {
       if (this.plan.getPartitions().getColumns() != null) {
-        columnIndexes = new int[plan.getPartitions().getColumns().size()];
-        columnNames = new String[columnIndexes.length];
-        for(int i = 0; i < plan.getPartitions().getColumns().size(); i++)  {
-          Column targetColumn = plan.getPartitions().getColumn(i);
+        partitionColumnIndices = new int[plan.getPartitions().getColumns().size()];
+        partitionColumnNames = new String[partitionColumnIndices.length];
+        Schema columnPartitionSchema = plan.getPartitions().getSchema();
+        for(int i = 0; i < columnPartitionSchema.getColumnNum(); i++)  {
+          Column targetColumn = columnPartitionSchema.getColumn(i);
           for(int j = 0; j < plan.getInSchema().getColumns().size();j++) {
             Column inputColumn = plan.getInSchema().getColumn(j);
             if (inputColumn.getColumnName().equals(targetColumn.getColumnName())) {
-              columnIndexes[i] = j;
-              columnNames[i] = targetColumn.getColumnName();
+              partitionColumnIndices[i] = j;
+              partitionColumnNames[i] = targetColumn.getColumnName();
             }
           }
         }
@@ -94,6 +101,25 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
     }
   }
 
+  /**
+   * This method rewrites an input schema of column-partitioned table because
+   * there are no actual field values in data file in a column-partitioned table.
+   * So, this method removes partition key columns from the input schema.
+   */
+  private void rewriteColumnPartitionedTableSchema() {
+    PartitionDesc partitionDesc = plan.getPartitions();
+    Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
+    columnPartitionSchema.setQualifier(plan.getTableName());
+
+    Schema modifiedOutputSchema = new Schema();
+    for (Column column : outSchema.toArray()) {
+      if (columnPartitionSchema.getColumnByName(column.getColumnName()) == null) {
+        modifiedOutputSchema.addColumn(column);
+      }
+    }
+    outSchema = modifiedOutputSchema;
+  }
+
   public void init() throws IOException {
     super.init();
 
@@ -124,8 +150,7 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
         LOG.info("File size: " + status.getLen());
       }
 
-      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
-          outSchema, dataFile);
+      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, dataFile);
       appender.enableStats();
       appender.init();
       appenderMap.put(partition, appender);
@@ -148,12 +173,12 @@ public class ColumnPartitionedTableStoreExec extends UnaryPhysicalExec {
     while((tuple = child.next()) != null) {
       // set subpartition directory name
       sb.delete(0, sb.length());
-      if (columnIndexes != null) {
-        for(int i = 0; i < columnIndexes.length; i++) {
-          Datum datum = (Datum) tuple.get(columnIndexes[i]);
+      if (partitionColumnIndices != null) {
+        for(int i = 0; i < partitionColumnIndices.length; i++) {
+          Datum datum = tuple.get(partitionColumnIndices[i]);
           if(i > 0)
             sb.append("/");
-          sb.append(columnNames[i]).append("=");
+          sb.append(partitionColumnNames[i]).append("=");
           sb.append(datum.asChars());
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index b799095..4e6cd64 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -27,9 +27,9 @@ import java.io.IOException;
 
 public abstract class PhysicalExec implements SchemaObject {
   protected final TaskAttemptContext context;
-  protected final Schema inSchema;
-  protected final Schema outSchema;
-  protected final int outColumnNum;
+  protected Schema inSchema;
+  protected Schema outSchema;
+  protected int outColumnNum;
 
   public PhysicalExec(final TaskAttemptContext context, final Schema inSchema,
                       final Schema outSchema) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 5783080..d17f7ec 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,15 +18,16 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.ScanNode;
@@ -34,8 +35,11 @@ import org.apache.tajo.storage.*;
 
 import java.io.IOException;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionsType;
+
 public class SeqScanExec extends PhysicalExec {
   private final ScanNode plan;
   private Scanner scanner = null;
@@ -63,8 +67,67 @@ public class SeqScanExec extends PhysicalExec {
     }
   }
 
+  /**
+   * This method rewrites an input schema of column-partitioned table because
+   * there are no actual field values in data file in a column-partitioned table.
+   * So, this method removes partition key columns from the input schema.
+   *
+   * TODO - This implementation assumes that a fragment is always FileFragment.
+   * In the column partitioned table, a path has an important role to
+   * indicate partition keys. In this time, it is right. Later, we have to fix it.
+   */
+  private void rewriteColumnPartitionedTableSchema() throws IOException {
+    PartitionDesc partitionDesc = plan.getTableDesc().getPartitions();
+    Schema columnPartitionSchema = (Schema) partitionDesc.getSchema().clone();
+    List<FileFragment> fileFragments = FragmentConvertor.convert(FileFragment.class, fragments);
+
+    // Get a partition key value from a given path
+    Tuple partitionRow =
+        TupleUtil.buildTupleFromPartitionPath(columnPartitionSchema, fileFragments.get(0).getPath(), false);
+
+    // Remove partition key columns from an input schema.
+    columnPartitionSchema.setQualifier(inSchema.getColumn(0).getQualifier());
+    Schema modifiedInputSchema = new Schema();
+    for (Column column : inSchema.toArray()) {
+      if (columnPartitionSchema.getColumnByName(column.getColumnName()) == null) {
+        modifiedInputSchema.addColumn(column);
+      }
+    }
+    this.inSchema = modifiedInputSchema;
+
+    // Targets or search conditions may contain column references.
+    // However, actual values absent in tuples. So, Replace all column references by constant datum.
+    for (Column column : columnPartitionSchema.toArray()) {
+      FieldEval targetExpr = new FieldEval(column);
+      EvalContext evalContext = targetExpr.newContext();
+      targetExpr.eval(evalContext, columnPartitionSchema, partitionRow);
+      Datum datum = targetExpr.terminate(evalContext);
+      ConstEval constExpr = new ConstEval(datum);
+      for (Target target : plan.getTargets()) {
+        if (target.getEvalTree().equals(targetExpr)) {
+          if (!target.hasAlias()) {
+            target.setAlias(target.getEvalTree().getName());
+          }
+          target.setExpr(constExpr);
+        } else {
+          EvalTreeUtil.replace(target.getEvalTree(), targetExpr, constExpr);
+        }
+      }
+
+      if (plan.hasQual()) {
+        EvalTreeUtil.replace(plan.getQual(), targetExpr, constExpr);
+      }
+    }
+  }
+
   public void init() throws IOException {
     Schema projected;
+
+    if (plan.getTableDesc().hasPartitions()
+        && plan.getTableDesc().getPartitions().getPartitionsType() == PartitionsType.COLUMN) {
+      rewriteColumnPartitionedTableSchema();
+    }
+
     if (plan.hasTargets()) {
       projected = new Schema();
       Set<Column> columnSet = new HashSet<Column>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
index 7673253..817e48a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/FilterPushDownRule.java
@@ -22,10 +22,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
 import org.apache.tajo.engine.planner.LogicalPlan;
 import org.apache.tajo.engine.planner.PlannerUtil;
@@ -65,7 +62,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
   @Override
   public LogicalNode visitFilter(Set<EvalNode> cnf, LogicalPlan plan, SelectionNode selNode, Stack<LogicalNode> stack)
       throws PlanningException {
-    cnf.addAll(Sets.newHashSet(EvalTreeUtil.getConjNormalForm(selNode.getQual())));
+    cnf.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
 
     stack.push(selNode);
     visitChild(cnf, plan, selNode.getChild(), stack);
@@ -178,15 +175,15 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
           EvalNode qual2 = null;
           if (matched2.size() > 1) {
              // merged into one eval tree
-             qual2 = EvalTreeUtil.transformCNF2Singleton(
-                        matched2.toArray(new EvalNode [matched2.size()]));
+             qual2 = AlgebraicUtil.createSingletonExprFromCNF(
+                 matched2.toArray(new EvalNode[matched2.size()]));
           } else if (matched2.size() == 1) {
              // if the number of matched expr is one
              qual2 = matched2.get(0);
           }
 
           if (qual2 != null) {
-             EvalNode conjQual2 = EvalTreeUtil.transformCNF2Singleton(joinNode.getJoinQual(), qual2);
+             EvalNode conjQual2 = AlgebraicUtil.createSingletonExprFromCNF(joinNode.getJoinQual(), qual2);
              joinNode.setJoinQual(conjQual2);
              cnf.removeAll(matched2);
           } // for the remaining cnf, push it as usual
@@ -194,7 +191,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
     }
 
     if (joinNode.hasJoinQual()) {
-      cnf.addAll(Sets.newHashSet(EvalTreeUtil.getConjNormalForm(joinNode.getJoinQual())));
+      cnf.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
     }
 
     visitChild(cnf, plan, left, stack);
@@ -210,7 +207,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
     EvalNode qual = null;
     if (matched.size() > 1) {
       // merged into one eval tree
-      qual = EvalTreeUtil.transformCNF2Singleton(
+      qual = AlgebraicUtil.createSingletonExprFromCNF(
           matched.toArray(new EvalNode[matched.size()]));
     } else if (matched.size() == 1) {
       // if the number of matched expr is one
@@ -243,8 +240,8 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<Set<EvalNode>, L
     EvalNode qual = null;
     if (matched.size() > 1) {
       // merged into one eval tree
-      qual = EvalTreeUtil.transformCNF2Singleton(
-          matched.toArray(new EvalNode [matched.size()]));
+      qual = AlgebraicUtil.createSingletonExprFromCNF(
+          matched.toArray(new EvalNode[matched.size()]));
     } else if (matched.size() == 1) {
       // if the number of matched expr is one
       qual = matched.get(0);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
new file mode 100644
index 0000000..561424f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.rewrite;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.sun.org.apache.commons.logging.Log;
+import com.sun.org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.*;
+import org.apache.tajo.engine.planner.BasicLogicalPlanVisitor;
+import org.apache.tajo.engine.planner.LogicalPlan;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.utils.TupleUtil;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class PartitionedTableRewriter implements RewriteRule {
+  private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class);
+
+  private static final String NAME = "Partitioned Table Rewriter";
+  private final Rewriter rewriter = new Rewriter();
+
+  private final TajoConf systemConf;
+
+  public PartitionedTableRewriter(TajoConf conf) {
+    systemConf = conf;
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  @Override
+  public boolean isEligible(LogicalPlan plan) {
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      for (RelationNode relation : block.getRelations()) {
+        if (relation.getType() == NodeType.SCAN) {
+          TableDesc table = ((ScanNode)relation).getTableDesc();
+          if (table.hasPartitions()) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
+    boolean containsPartitionedTables;
+    for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
+      containsPartitionedTables = false;
+      for (RelationNode relation : block.getRelations()) {
+        if (relation.getType() == NodeType.SCAN) {
+          TableDesc table = ((ScanNode)relation).getTableDesc();
+          if (table.hasPartitions()) {
+            containsPartitionedTables = true;
+          }
+        }
+      }
+      if (containsPartitionedTables) {
+        rewriter.visitChild(block, plan, block.getRoot(), new Stack<LogicalNode>());
+      }
+    }
+    return plan;
+  }
+
+  private static class PartitionPathFilter implements PathFilter {
+    private FileSystem fs;
+    private Schema schema;
+    private EvalNode partitionFilter;
+    private EvalContext evalContext;
+
+
+    public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
+      this.schema = schema;
+      this.partitionFilter = partitionFilter;
+      evalContext = partitionFilter.newContext();
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      Tuple tuple = TupleUtil.buildTupleFromPartitionPath(schema, path, true);
+      if (tuple == null) { // if it is a file or not acceptable file
+        return false;
+      }
+      partitionFilter.eval(evalContext, schema, tuple);
+      return partitionFilter.terminate(evalContext).asBool();
+    }
+
+    @Override
+    public String toString() {
+      return partitionFilter.toString();
+    }
+  }
+
+  /**
+   * It assumes that each conjunctive form corresponds to one column.
+   *
+   * @param partitionColumns
+   * @param conjunctiveForms search condition corresponding to partition columns.
+   *                         If it is NULL, it means that there is no search condition for this table.
+   * @param tablePath
+   * @return
+   * @throws IOException
+   */
+  private Path [] findFilteredPaths(Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath)
+      throws IOException {
+
+    FileSystem fs = tablePath.getFileSystem(systemConf);
+
+    PathFilter [] filters;
+    if (conjunctiveForms == null) {
+      filters = buildAllAcceptingPathFilters(partitionColumns);
+    } else {
+      filters = buildPathFiltersForAllLevels(partitionColumns, conjunctiveForms);
+    }
+
+    // loop from one to the number of partition columns
+    Path [] filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0]));
+
+    for (int i = 1; i < partitionColumns.getColumnNum(); i++) {
+      // Get all file status matched to a ith level path filter.
+      filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i]));
+    }
+
+    return filteredPaths;
+  }
+
+  /**
+   * Build path filters for all levels with a list of filter conditions.
+   *
+   * For example, consider you have a partitioned table for three columns (i.e., col1, col2, col3).
+   * Then, this methods will create three path filters for (col1), (col1, col2), (col1, col2, col3).
+   *
+   * Corresponding filter conditions will be placed on each path filter,
+   * If there is no corresponding expression for certain column,
+   * The condition will be filled with a true value.
+   *
+   * Assume that an user gives a condition WHERE col1 ='A' and col3 = 'C'.
+   * There is no filter condition corresponding to col2.
+   * Then, the path filter conditions are corresponding to the followings:
+   *
+   * The first path filter: col1 = 'A'
+   * The second path filter: col1 = 'A' AND col2 IS NOT NULL
+   * The third path filter: col1 = 'A' AND col2 IS NOT NULL AND col3 = 'C'
+   *
+   * 'IS NOT NULL' predicate is always true against the partition path.
+   *
+   * @param partitionColumns
+   * @param conjunctiveForms
+   * @return
+   */
+  private static PathFilter [] buildPathFiltersForAllLevels(Schema partitionColumns,
+                                                     EvalNode [] conjunctiveForms) {
+    // Building partition path filters for all levels
+    Column target;
+    PathFilter [] filters = new PathFilter[partitionColumns.getColumnNum()];
+    List<EvalNode> accumulatedFilters = Lists.newArrayList();
+    for (int i = 0; i < partitionColumns.getColumnNum(); i++) { // loop from one to level
+      target = partitionColumns.getColumn(i);
+
+      for (EvalNode expr : conjunctiveForms) {
+        if (EvalTreeUtil.findDistinctRefColumns(expr).contains(target)) {
+          // Accumulate one qual per level
+          accumulatedFilters.add(expr);
+        }
+      }
+
+      if (accumulatedFilters.size() < (i + 1)) {
+        accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+      }
+
+      EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+          accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+      filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+    }
+    return filters;
+  }
+
+  /**
+   * Build an array of path filters for all levels with all accepting filter condition.
+   * @param partitionColumns The partition columns schema
+   * @return The array of path filter, accpeting all partition paths.
+   */
+  private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
+    Column target;
+    PathFilter [] filters = new PathFilter[partitionColumns.getColumnNum()];
+    List<EvalNode> accumulatedFilters = Lists.newArrayList();
+    for (int i = 0; i < partitionColumns.getColumnNum(); i++) { // loop from one to level
+      target = partitionColumns.getColumn(i);
+      accumulatedFilters.add(new IsNullEval(true, new FieldEval(target)));
+
+      EvalNode filterPerLevel = AlgebraicUtil.createSingletonExprFromCNF(
+          accumulatedFilters.toArray(new EvalNode[accumulatedFilters.size()]));
+      filters[i] = new PartitionPathFilter(partitionColumns, filterPerLevel);
+    }
+    return filters;
+  }
+
+  private static Path [] toPathArray(FileStatus[] fileStatuses) {
+    Path [] paths = new Path[fileStatuses.length];
+    for (int j = 0; j < fileStatuses.length; j++) {
+      paths[j] = fileStatuses[j].getPath();
+    }
+    return paths;
+  }
+
+  private Path [] findFilteredPartitionPaths(ScanNode scanNode) throws IOException {
+    TableDesc table = scanNode.getTableDesc();
+    FileSystem fs = table.getPath().getFileSystem(systemConf);
+    LOG.info("Partitioned Table Dir: " + table.getPath());
+    LOG.info("Summary: " + fs.getContentSummary(table.getPath()).getDirectoryCount());
+    PartitionDesc partitionDesc = scanNode.getTableDesc().getPartitions();
+
+    Schema paritionValuesSchema = new Schema();
+    for (Column column : partitionDesc.getColumns()) {
+      paritionValuesSchema.addColumn(column);
+    }
+
+    Set<EvalNode> indexablePredicateSet = Sets.newHashSet();
+
+    // if a query statement has a search condition, try to find indexable predicates
+    if (scanNode.hasQual()) {
+      EvalNode [] conjunctiveForms = AlgebraicUtil.toConjunctiveNormalFormArray(scanNode.getQual());
+      Set<EvalNode> remainExprs = Sets.newHashSet(conjunctiveForms);
+
+      // add qualifier to schema for qual
+      paritionValuesSchema.setQualifier(scanNode.getCanonicalName());
+      for (Column column : paritionValuesSchema.getColumns()) {
+        for (EvalNode simpleExpr : conjunctiveForms) {
+          if (checkIfIndexablePredicateOnTargetColumn(simpleExpr, column)) {
+            indexablePredicateSet.add(simpleExpr);
+          }
+        }
+      }
+
+      // Partitions which are not matched to the partition filter conditions are pruned immediately.
+      // So, the partition filter conditions are not necessary later, and they are removed from
+      // original search condition for simplicity and efficiency.
+      remainExprs.removeAll(indexablePredicateSet);
+      if (remainExprs.isEmpty()) {
+        scanNode.setQual(null);
+      } else {
+        scanNode.setQual(
+            AlgebraicUtil.createSingletonExprFromCNF(remainExprs.toArray(new EvalNode[remainExprs.size()])));
+      }
+    }
+
+    if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates
+      return findFilteredPaths(paritionValuesSchema,
+          indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), table.getPath());
+    } else { // otherwise, we will get all partition paths.
+      return findFilteredPaths(paritionValuesSchema, null, table.getPath());
+    }
+  }
+
+  private boolean checkIfIndexablePredicateOnTargetColumn(EvalNode evalNode, Column targetColumn) {
+    if (checkIfIndexablePredicate(evalNode) || checkIfDisjunctiveButOneVariable(evalNode)) {
+      Set<Column> variables = EvalTreeUtil.findDistinctRefColumns(evalNode);
+      // if it contains only single variable matched to a target column
+      return variables.size() == 1 && variables.contains(targetColumn);
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Check if an expression consists of one variable and one constant and
+   * the expression is a comparison operator.
+   *
+   * @param evalNode The expression to be checked
+   * @return true if an expression consists of one variable and one constant
+   * and the expression is a comparison operator. Other, false.
+   */
+  private boolean checkIfIndexablePredicate(EvalNode evalNode) {
+    // TODO - LIKE with a trailing wild-card character and IN with an array can be indexable
+    return AlgebraicUtil.containSingleVar(evalNode) && AlgebraicUtil.isIndexableOperator(evalNode);
+  }
+
+  /**
+   *
+   * @param evalNode The expression to be checked
+   * @return true if an disjunctive expression, consisting of indexable expressions
+   */
+  private boolean checkIfDisjunctiveButOneVariable(EvalNode evalNode) {
+    if (evalNode.getType() == EvalType.OR) {
+      boolean indexable =
+          checkIfIndexablePredicate(evalNode.getLeftExpr()) &&
+              checkIfIndexablePredicate(evalNode.getRightExpr());
+
+      boolean sameVariable =
+          EvalTreeUtil.findDistinctRefColumns(evalNode.getLeftExpr())
+          .equals(EvalTreeUtil.findDistinctRefColumns(evalNode.getRightExpr()));
+
+      return indexable && sameVariable;
+    } else {
+      return false;
+    }
+  }
+
+  private void updateTableStat(PartitionedTableScanNode scanNode) throws PlanningException {
+    if (scanNode.getInputPaths().length > 0) {
+      try {
+        FileSystem fs = scanNode.getInputPaths()[0].getFileSystem(systemConf);
+        long totalVolume = 0;
+
+        for (Path input : scanNode.getInputPaths()) {
+          ContentSummary summary = fs.getContentSummary(input);
+          totalVolume += summary.getLength();
+          totalVolume += summary.getFileCount();
+        }
+        scanNode.getTableDesc().getStats().setNumBytes(totalVolume);
+      } catch (IOException e) {
+        throw new PlanningException(e);
+      }
+    }
+  }
+
+  private final class Rewriter extends BasicLogicalPlanVisitor<LogicalPlan.QueryBlock, Object> {
+    @Override
+    public Object visitScan(LogicalPlan.QueryBlock block, LogicalPlan plan, ScanNode scanNode, Stack<LogicalNode> stack)
+        throws PlanningException {
+
+      TableDesc table = scanNode.getTableDesc();
+      if (!table.hasPartitions()) {
+        return null;
+      }
+
+      try {
+        Path [] filteredPaths = findFilteredPartitionPaths(scanNode);
+        plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions");
+        PartitionedTableScanNode rewrittenScanNode = new PartitionedTableScanNode(plan.newPID(), scanNode, filteredPaths);
+        updateTableStat(rewrittenScanNode);
+        PlannerUtil.replaceNode(plan, stack.peek(), scanNode, rewrittenScanNode);
+      } catch (IOException e) {
+        throw new PlanningException("Partitioned Table Rewrite Failed: \n" + e.getMessage());
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/f58f6ee8/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index d0e9a6f..403b403 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -304,7 +304,7 @@ public class ProjectionPushDownRule extends BasicLogicalPlanVisitor<ProjectionPu
     LogicalNode child = visitChild(newContext, plan, subRoot, newStack);
     newStack.pop();
     Schema inSchema = (Schema) child.getOutSchema().clone();
-    inSchema.setQualifier(node.getCanonicalName(), true);
+    inSchema.setQualifier(node.getCanonicalName());
     node.setInSchema(inSchema);
     return pushDownProjectablePost(context, node, isTopmostProjectable(stack));
   }