You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/30 13:55:22 UTC

[3/4] tajo git commit: TAJO-269: Protocol buffer De/Serialization for LogicalNode.

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index 2d9b1f8..0df4001 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -26,18 +26,18 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.client.QueryStatus;
 import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.engine.utils.test.ErrorInjectionRewriter;
 import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.plan.rewrite.BaseLogicalPlanRewriteRuleProvider;
+import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.sql.ResultSet;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TimeZone;
+import java.util.*;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.junit.Assert.*;
@@ -416,11 +416,23 @@ public class TestSelectQuery extends QueryTestCaseBase {
     cleanupQuery(res);
   }
 
+  public static class RulesForErrorInjection extends BaseLogicalPlanRewriteRuleProvider {
+    public RulesForErrorInjection(TajoConf conf) {
+      super(conf);
+    }
+
+    @Override
+    public Collection<Class<? extends LogicalPlanRewriteRule>> getPostRules() {
+      List<Class<? extends LogicalPlanRewriteRule>> addedRules = Lists.newArrayList(super.getPostRules());
+      return addedRules;
+    }
+  }
+
   @Test
   public final void testQueryMasterTaskInitError() throws Exception {
     // In this testcase we can check that a TajoClient receives QueryMasterTask's init error message.
-    testingCluster.setAllWorkersConfValue("tajo.plan.rewriter.classes",
-        ErrorInjectionRewriter.class.getCanonicalName());
+    testingCluster.setAllWorkersConfValue(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS.name(),
+        RulesForErrorInjection.class.getCanonicalName());
 
     try {
       // If client can't receive error status, thread runs forever.
@@ -450,7 +462,8 @@ public class TestSelectQuery extends QueryTestCaseBase {
       // If query runs more than 10 secs, test is fail.
       assertFalse(t.isAlive());
     } finally {
-      testingCluster.setAllWorkersConfValue("tajo.plan.rewriter.classes", "");
+      // recover the rewrite rule provider to default
+      testingCluster.setAllWorkersConfValue(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS.name(), "");
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTruncateTable.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTruncateTable.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTruncateTable.java
index 455213b..1be21e4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTruncateTable.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTruncateTable.java
@@ -25,8 +25,10 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import java.sql.ResultSet;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 @Category(IntegrationTest.class)
 public class TestTruncateTable extends QueryTestCaseBase {
@@ -63,11 +65,6 @@ public class TestTruncateTable extends QueryTestCaseBase {
     }
   }
 
-
-  /*
-  Currently TajoClient can't throw exception when plan error.
-  The following test cast should be uncommented after https://issues.apache.org/jira/browse/TAJO-762
-
   @Test
   public final void testTruncateExternalTable() throws Exception {
     try {
@@ -100,5 +97,4 @@ public class TestTruncateTable extends QueryTestCaseBase {
       executeString("DROP TABLE truncate_table2 PURGE");
     }
   }
-  */
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
index 2af5ce9..668ba70 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestWindowQuery.java
@@ -272,7 +272,8 @@ public class TestWindowQuery extends QueryTestCaseBase {
     TajoTestingCluster.createTable("firstvaluetime", schema, tableOptions, data, 1);
 
     try {
-      ResultSet res = executeString("select id, first_value(time) over ( partition by id order by time ) as time_first from firstvaluetime");
+      ResultSet res = executeString(
+          "select id, first_value(time) over ( partition by id order by time ) as time_first from firstvaluetime");
       String ascExpected = "id,time_first\n" +
           "-------------------------------\n" +
           "1,12:11:12\n" +
@@ -306,7 +307,8 @@ public class TestWindowQuery extends QueryTestCaseBase {
     TajoTestingCluster.createTable("lastvaluetime", schema, tableOptions, data, 1);
 
     try {
-      ResultSet res = executeString("select id, last_value(time) over ( partition by id order by time ) as time_last from lastvaluetime");
+      ResultSet res = executeString(
+          "select id, last_value(time) over ( partition by id order by time ) as time_last from lastvaluetime");
       String ascExpected = "id,time_last\n" +
           "-------------------------------\n" +
           "1,12:11:12\n" +

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index c908737..d0f7cf4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -126,9 +126,9 @@ public class TestGlobalPlanner {
 
   private MasterPlan buildPlan(String sql) throws PlanningException, IOException {
     Expr expr = sqlAnalyzer.parse(sql);
-    LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(util.getConfiguration()), expr);
-    optimizer.optimize(plan);
-    QueryContext context = new QueryContext(util.getConfiguration());
+    QueryContext context = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
+    LogicalPlan plan = planner.createPlan(context, expr);
+    optimizer.optimize(context, plan);
     MasterPlan masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), context, plan);
     globalPlanner.build(masterPlan);
     return masterPlan;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
index 750e64e..18a8859 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalOptimizer.java
@@ -18,16 +18,19 @@
 
 package org.apache.tajo.plan;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.tajo.ConfigKey;
 import org.apache.tajo.OverridableConf;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.util.ReflectionUtil;
 import org.apache.tajo.util.graph.DirectedGraphCursor;
 import org.apache.tajo.plan.expr.AlgebraicUtil;
 import org.apache.tajo.plan.expr.EvalNode;
@@ -37,14 +40,10 @@ import org.apache.tajo.plan.joinorder.JoinGraph;
 import org.apache.tajo.plan.joinorder.JoinOrderAlgorithm;
 import org.apache.tajo.plan.logical.*;
 import org.apache.tajo.plan.rewrite.*;
-import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule;
-import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
-import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 
 import java.util.LinkedHashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.Stack;
 
@@ -58,47 +57,36 @@ import static org.apache.tajo.plan.joinorder.GreedyHeuristicJoinOrderAlgorithm.g
 public class LogicalOptimizer {
   private static final Log LOG = LogFactory.getLog(LogicalOptimizer.class.getName());
 
-  private BasicQueryRewriteEngine rulesBeforeJoinOpt;
-  private BasicQueryRewriteEngine rulesAfterToJoinOpt;
+  private BaseLogicalPlanRewriteEngine rulesBeforeJoinOpt;
+  private BaseLogicalPlanRewriteEngine rulesAfterToJoinOpt;
   private JoinOrderAlgorithm joinOrderAlgorithm = new GreedyHeuristicJoinOrderAlgorithm();
 
-  public LogicalOptimizer(TajoConf systemConf) {
-    rulesBeforeJoinOpt = new BasicQueryRewriteEngine();
-    if (systemConf.getBoolVar(ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED)) {
-      rulesBeforeJoinOpt.addRewriteRule(new FilterPushDownRule());
-    }
+  public LogicalOptimizer(TajoConf conf) {
 
-    rulesAfterToJoinOpt = new BasicQueryRewriteEngine();
-    rulesAfterToJoinOpt.addRewriteRule(new ProjectionPushDownRule());
-    rulesAfterToJoinOpt.addRewriteRule(new PartitionedTableRewriter(systemConf));
-
-    // Currently, it is only used for some test cases to inject exception manually.
-    String userDefinedRewriterClass = systemConf.get("tajo.plan.rewriter.classes");
-    if (userDefinedRewriterClass != null && !userDefinedRewriterClass.isEmpty()) {
-      for (String eachRewriterClass : userDefinedRewriterClass.split(",")) {
-        try {
-          RewriteRule rule = (RewriteRule) Class.forName(eachRewriterClass).newInstance();
-          rulesAfterToJoinOpt.addRewriteRule(rule);
-        } catch (Exception e) {
-          LOG.error("Can't initiate a Rewriter object: " + eachRewriterClass, e);
-          continue;
-        }
-      }
-    }
+    Class clazz = conf.getClassVar(ConfVars.LOGICAL_PLAN_REWRITE_RULE_PROVIDER_CLASS);
+    LogicalPlanRewriteRuleProvider provider = (LogicalPlanRewriteRuleProvider) ReflectionUtil.newInstance(clazz, conf);
+
+    rulesBeforeJoinOpt = new BaseLogicalPlanRewriteEngine();
+    rulesBeforeJoinOpt.addRewriteRule(provider.getPreRules());
+    rulesAfterToJoinOpt = new BaseLogicalPlanRewriteEngine();
+    rulesAfterToJoinOpt.addRewriteRule(provider.getPostRules());
   }
 
-  public void addRuleAfterToJoinOpt(RewriteRule rewriteRule) {
+  public void addRuleAfterToJoinOpt(LogicalPlanRewriteRule rewriteRule) {
     if (rewriteRule != null) {
       rulesAfterToJoinOpt.addRewriteRule(rewriteRule);
     }
   }
 
+  @VisibleForTesting
   public LogicalNode optimize(LogicalPlan plan) throws PlanningException {
-    return optimize(null, plan);
+    OverridableConf conf = new OverridableConf(new TajoConf(),
+        ConfigKey.ConfigType.SESSION, ConfigKey.ConfigType.QUERY, ConfigKey.ConfigType.SYSTEM);
+    return optimize(conf, plan);
   }
 
   public LogicalNode optimize(OverridableConf context, LogicalPlan plan) throws PlanningException {
-    rulesBeforeJoinOpt.rewrite(plan);
+    rulesBeforeJoinOpt.rewrite(context, plan);
 
     DirectedGraphCursor<String, BlockEdge> blockCursor =
         new DirectedGraphCursor<String, BlockEdge>(plan.getQueryBlockGraph(), plan.getRootBlock().getName());
@@ -111,7 +99,7 @@ public class LogicalOptimizer {
     } else {
       LOG.info("Skip Join Optimized.");
     }
-    rulesAfterToJoinOpt.rewrite(plan);
+    rulesAfterToJoinOpt.rewrite(context, plan);
     return plan.getRootBlock().getRoot();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
index 7c29099..544f83a 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanPreprocessor.java
@@ -112,7 +112,7 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanner.P
         throw new NoSuchColumnException(CatalogUtil.buildFQName(qualifier, "*"));
       }
 
-      Schema schema = relationOp.getTableSchema();
+      Schema schema = relationOp.getLogicalSchema();
       Column[] resolvedColumns = new Column[schema.size()];
       return schema.getColumns().toArray(resolvedColumns);
     } else { // if a column reference is not qualified
@@ -123,7 +123,7 @@ public class LogicalPlanPreprocessor extends BaseAlgebraVisitor<LogicalPlanner.P
 
       while (iterator.hasNext()) {
         relationOp = iterator.next();
-        schema = relationOp.getTableSchema();
+        schema = relationOp.getLogicalSchema();
         resolvedColumns.addAll(schema.getColumns());
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index 1a426e0..eebee6f 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -144,6 +144,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     // Add Root Node
     LogicalRootNode root = plan.createNode(LogicalRootNode.class);
+
     root.setInSchema(topMostNode.getOutSchema());
     root.setChild(topMostNode);
     root.setOutSchema(topMostNode.getOutSchema());
@@ -257,9 +258,9 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     // Set ProjectionNode
     projectionNode = context.queryBlock.getNodeFromExpr(projection);
-    projectionNode.setInSchema(child.getOutSchema());
-    projectionNode.setTargets(targets);
+    projectionNode.init(projection.isDistinct(), targets);
     projectionNode.setChild(child);
+    projectionNode.setInSchema(child.getOutSchema());
 
     if (projection.isDistinct() && block.hasNode(NodeType.GROUP_BY)) {
       throw new VerifyException("Cannot support grouping and distinct at the same time yet");
@@ -521,7 +522,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     } else if (projectable instanceof RelationNode) {
       RelationNode relationNode = (RelationNode) projectable;
-      verifyIfTargetsCanBeEvaluated(relationNode.getTableSchema(), (Projectable) relationNode);
+      verifyIfTargetsCanBeEvaluated(relationNode.getLogicalSchema(), (Projectable) relationNode);
 
     } else {
       verifyIfTargetsCanBeEvaluated(projectable.getInSchema(), projectable);
@@ -1300,7 +1301,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   private static LinkedHashSet<Target> createFieldTargetsFromRelation(QueryBlock block, RelationNode relationNode,
                                                       Set<String> newlyEvaluatedRefNames) {
     LinkedHashSet<Target> targets = Sets.newLinkedHashSet();
-    for (Column column : relationNode.getTableSchema().getColumns()) {
+    for (Column column : relationNode.getLogicalSchema().getColumns()) {
       String aliasName = block.namedExprsMgr.checkAndGetIfAliasedColumn(column.getQualifiedName());
       if (aliasName != null) {
         targets.add(new Target(new FieldEval(column), aliasName));
@@ -1569,7 +1570,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     }
 
     if (child instanceof Projectable) {
-      Projectable projectionNode = (Projectable) insertNode.getChild();
+      Projectable projectionNode = (Projectable)insertNode.getChild();
 
       // Modifying projected columns by adding NULL constants
       // It is because that table appender does not support target columns to be written.
@@ -2017,7 +2018,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       return false;
     }
 
-    if (columnRefs.size() > 0 && !node.getTableSchema().containsAll(columnRefs)) {
+    if (columnRefs.size() > 0 && !node.getLogicalSchema().containsAll(columnRefs)) {
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/Target.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/Target.java b/tajo-plan/src/main/java/org/apache/tajo/plan/Target.java
index f49a93d..a5c39b8 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/Target.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/Target.java
@@ -20,17 +20,20 @@ package org.apache.tajo.plan;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.plan.expr.FieldEval;
+import org.apache.tajo.plan.serder.LogicalNodeSerializer;
 import org.apache.tajo.plan.serder.PlanGsonHelper;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.util.TUtil;
 
 /**
  * A Target contains how to evaluate an expression and its alias name.
  */
-public class Target implements Cloneable, GsonObject {
+public class Target implements Cloneable, GsonObject, ProtoObject<PlanProto.Target> {
   @Expose private EvalNode expr;
   @Expose private Column column;
   @Expose private String alias = null;
@@ -46,8 +49,7 @@ public class Target implements Cloneable, GsonObject {
     String normalized = alias;
 
     // If an expr is a column reference and its alias is equivalent to column name, ignore a given alias.
-    if (eval instanceof FieldEval
-        && eval.getName().equals(normalized)) {
+    if (eval instanceof FieldEval && eval.getName().equals(normalized)) {
       column = ((FieldEval) eval).getColumnRef();
     } else {
       column = new Column(normalized, eval.getValueType());
@@ -127,4 +129,9 @@ public class Target implements Cloneable, GsonObject {
   public String toJson() {
     return PlanGsonHelper.toJson(this, Target.class);
   }
+
+  @Override
+  public PlanProto.Target getProto() {
+    return LogicalNodeSerializer.convertTarget(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java
index 542eae8..ca8c110 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/AggregationFunctionCallEval.java
@@ -27,13 +27,14 @@ import org.apache.tajo.plan.function.AggFunction;
 import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.util.TUtil;
 
 public class AggregationFunctionCallEval extends FunctionEval implements Cloneable {
-  @Expose protected AggFunction instance;
   @Expose boolean intermediatePhase = false;
   @Expose boolean finalPhase = true;
   @Expose String alias;
 
+  protected AggFunction instance;
   private Tuple params;
 
   protected AggregationFunctionCallEval(EvalType type, FunctionDesc desc, AggFunction instance, EvalNode[] givenArgs) {
@@ -91,6 +92,10 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
     }
   }
 
+  public boolean hasAlias() {
+    return this.alias != null;
+  }
+
   public void setAlias(String alias) { this.alias = alias; }
 
   public String getAlias() { return  this.alias; }
@@ -106,6 +111,22 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
     return clone;
   }
 
+  public boolean isIntermediatePhase() {
+    return intermediatePhase;
+  }
+
+  public void setIntermediatePhase(boolean flag) {
+    this.intermediatePhase = flag;
+  }
+
+  public void setFinalPhase(boolean flag) {
+    this.finalPhase = flag;
+  }
+
+  public boolean isFinalPhase() {
+    return finalPhase;
+  }
+
   public void setFirstPhase() {
     this.finalPhase = false;
     this.intermediatePhase = false;
@@ -120,4 +141,19 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
     this.finalPhase = false;
     this.intermediatePhase = true;
   }
+
+  public boolean equals(Object obj) {
+    if (obj instanceof AggregationFunctionCallEval) {
+      AggregationFunctionCallEval other = (AggregationFunctionCallEval) obj;
+
+      boolean eq = super.equals(other);
+      eq &= instance.equals(other.instance);
+      eq &= intermediatePhase == other.intermediatePhase;
+      eq &= finalPhase == other.finalPhase;
+      eq &= TUtil.checkEquals(alias, other.alias);
+      return eq;
+    }
+
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
index 638383a..dcb7285 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/EvalNode.java
@@ -20,17 +20,20 @@ package org.apache.tajo.plan.expr;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.plan.serder.EvalNodeSerializer;
 import org.apache.tajo.plan.serder.PlanGsonHelper;
+import org.apache.tajo.plan.serder.PlanProto;
 import org.apache.tajo.storage.Tuple;
 
 /**
  * An annotated expression which includes actual data domains.
  * It is also used for evaluation.
  */
-public abstract class EvalNode implements Cloneable, GsonObject {
+public abstract class EvalNode implements Cloneable, GsonObject, ProtoObject<PlanProto.EvalNodeTree> {
 	@Expose protected EvalType type;
 
   public EvalNode() {
@@ -71,4 +74,9 @@ public abstract class EvalNode implements Cloneable, GsonObject {
     evalNode.type = type;
     return evalNode;
   }
+
+  @Override
+  public PlanProto.EvalNodeTree getProto() {
+    return EvalNodeSerializer.serialize(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/expr/WindowFunctionEval.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/WindowFunctionEval.java b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/WindowFunctionEval.java
index 84b4a45..0ff5927 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/expr/WindowFunctionEval.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/expr/WindowFunctionEval.java
@@ -86,6 +86,17 @@ public class WindowFunctionEval extends AggregationFunctionCallEval implements C
     return funcDesc.getReturnType();
   }
 
+  public boolean equals(Object obj) {
+    if (obj instanceof WindowFunctionEval) {
+      WindowFunctionEval other = (WindowFunctionEval) obj;
+      boolean eq = TUtil.checkEquals(sortSpecs, other.sortSpecs);
+      eq &= TUtil.checkEquals(windowFrame, other.windowFrame);
+      return eq;
+    } else {
+      return false;
+    }
+  }
+
   @Override
   public Object clone() throws CloneNotSupportedException {
     WindowFunctionEval windowFunctionEval = (WindowFunctionEval) super.clone();
@@ -95,6 +106,7 @@ public class WindowFunctionEval extends AggregationFunctionCallEval implements C
         windowFunctionEval.sortSpecs[i] = (SortSpec) sortSpecs[i].clone();
       }
     }
+    windowFunctionEval.windowFrame = windowFrame.clone();
     return windowFunctionEval;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java
index e9e2467..e926dce 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTableNode.java
@@ -43,6 +43,16 @@ public class AlterTableNode extends LogicalNode {
     super(pid, NodeType.ALTER_TABLE);
   }
 
+  @Override
+  public int childNum() {
+    return 0;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    return null;
+  }
+
   public String getTableName() {
     return tableName;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTablespaceNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTablespaceNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTablespaceNode.java
index 7b79cc1..8b68dbe 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTablespaceNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/AlterTablespaceNode.java
@@ -23,9 +23,6 @@ import com.google.common.base.Objects;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.algebra.AlterTablespaceSetType;
 import org.apache.tajo.plan.PlanString;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.LogicalNodeVisitor;
-import org.apache.tajo.plan.logical.NodeType;
 
 public class AlterTablespaceNode extends LogicalNode implements Cloneable {
 
@@ -38,6 +35,16 @@ public class AlterTablespaceNode extends LogicalNode implements Cloneable {
     super(pid, NodeType.ALTER_TABLESPACE);
   }
 
+  @Override
+  public int childNum() {
+    return 0;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    return null;
+  }
+
   public String getTablespaceName() {
     return tablespaceName;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/BinaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/BinaryNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/BinaryNode.java
index 709ef34..70b1bc4 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/BinaryNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/BinaryNode.java
@@ -28,6 +28,22 @@ public abstract class BinaryNode extends LogicalNode implements Cloneable, GsonO
 	public BinaryNode(int pid, NodeType nodeType) {
 		super(pid, nodeType);
 	}
+
+  @Override
+  public int childNum() {
+    return 2;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    if (idx == 0) {
+      return leftChild;
+    } else if (idx == 1) {
+      return rightChild;
+    } else {
+      throw new ArrayIndexOutOfBoundsException(idx);
+    }
+  }
 	
 	public <T extends LogicalNode> T getLeftChild() {
 		return (T) this.leftChild;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateDatabaseNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateDatabaseNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateDatabaseNode.java
index e3f73fe..28bd4cd 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateDatabaseNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateDatabaseNode.java
@@ -30,6 +30,16 @@ public class CreateDatabaseNode extends LogicalNode implements Cloneable {
     super(pid, NodeType.CREATE_DATABASE);
   }
 
+  @Override
+  public int childNum() {
+    return 0;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    return null;
+  }
+
   public void init(String databaseName, boolean ifNotExists) {
     this.databaseName = databaseName;
     this.ifNotExists = ifNotExists;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
index d03da6a..0976ab5 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/CreateTableNode.java
@@ -99,11 +99,12 @@ public class CreateTableNode extends StoreTableNode implements Cloneable {
   public boolean equals(Object obj) {
     if (obj instanceof CreateTableNode) {
       CreateTableNode other = (CreateTableNode) obj;
-      return super.equals(other)
-          && this.schema.equals(other.schema)
-          && this.external == other.external
-          && TUtil.checkEquals(path, other.path)
-          && ifNotExists == other.ifNotExists;
+      boolean eq = super.equals(other);
+      eq &= this.schema.equals(other.schema);
+      eq &= this.external == other.external;
+      eq &= TUtil.checkEquals(path, other.path);
+      eq &= ifNotExists == other.ifNotExists;;
+      return eq;
     } else {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java
index e31e488..a40ad59 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DistinctGroupbyNode.java
@@ -34,19 +34,19 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
   private GroupbyNode groupbyPlan;
 
   @Expose
-  private List<GroupbyNode> groupByNodes;
+  private List<GroupbyNode> subGroupbyPlan;
 
   @Expose
   private Target[] targets;
 
   @Expose
-  private Column[] groupingColumns;
+  private Column[] groupingColumns = PlannerUtil.EMPTY_COLUMNS;
 
   @Expose
-  private int[] resultColumnIds;
+  private int[] resultColumnIds = new int[]{};
 
   /** Aggregation Functions */
-  @Expose private AggregationFunctionCallEval[] aggrFunctions;
+  @Expose private AggregationFunctionCallEval[] aggrFunctions = PlannerUtil.EMPTY_AGG_FUNCS;
 
   public DistinctGroupbyNode(int pid) {
     super(pid, NodeType.DISTINCT_GROUP_BY);
@@ -54,7 +54,7 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
 
   @Override
   public boolean hasTargets() {
-    return targets != null && targets.length > 0;
+    return targets.length > 0;
   }
 
   @Override
@@ -72,19 +72,19 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
     }
   }
 
-  public void setGroupbyNodes(List<GroupbyNode> groupByNodes) {
-    this.groupByNodes =  groupByNodes;
+  public void setSubPlans(List<GroupbyNode> groupByNodes) {
+    this.subGroupbyPlan =  groupByNodes;
   }
 
-  public List<GroupbyNode> getGroupByNodes() {
-    return groupByNodes;
+  public List<GroupbyNode> getSubPlans() {
+    return subGroupbyPlan;
   }
 
   public final Column[] getGroupingColumns() {
     return groupingColumns;
   }
 
-  public final void setGroupColumns(Column[] groupingColumns) {
+  public final void setGroupingColumns(Column[] groupingColumns) {
     this.groupingColumns = groupingColumns;
   }
 
@@ -119,12 +119,12 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
       }
     }
 
-    if (groupByNodes != null) {
-      cloneNode.groupByNodes = new ArrayList<GroupbyNode>();
-      for (GroupbyNode eachNode: groupByNodes) {
+    if (subGroupbyPlan != null) {
+      cloneNode.subGroupbyPlan = new ArrayList<GroupbyNode>();
+      for (GroupbyNode eachNode: subGroupbyPlan) {
         GroupbyNode groupbyNode = (GroupbyNode)eachNode.clone();
         groupbyNode.setPID(-1);
-        cloneNode.groupByNodes.add(groupbyNode);
+        cloneNode.subGroupbyPlan.add(groupbyNode);
       }
     }
 
@@ -151,7 +151,7 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
       sb.append("grouping set=").append(TUtil.arrayToString(groupingColumns));
       sb.append(", ");
     }
-    for (GroupbyNode eachNode: groupByNodes) {
+    for (GroupbyNode eachNode: subGroupbyPlan) {
       sb.append(", groupbyNode=").append(eachNode.toString());
     }
     sb.append(")");
@@ -164,8 +164,9 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
       DistinctGroupbyNode other = (DistinctGroupbyNode) obj;
       boolean eq = super.equals(other);
       eq = eq && TUtil.checkEquals(groupingColumns, other.groupingColumns);
-      eq = eq && TUtil.checkEquals(groupByNodes, other.groupByNodes);
+      eq = eq && TUtil.checkEquals(subGroupbyPlan, other.subGroupbyPlan);
       eq = eq && TUtil.checkEquals(targets, other.targets);
+      eq = eq && TUtil.checkEquals(resultColumnIds, other.resultColumnIds);
       return eq;
     } else {
       return false;
@@ -194,7 +195,7 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
     sb.append("(");
 
     String prefix = "";
-    for (GroupbyNode eachNode: groupByNodes) {
+    for (GroupbyNode eachNode: subGroupbyPlan) {
       if (eachNode.hasAggFunctions()) {
         AggregationFunctionCallEval[] aggrFunctions = eachNode.getAggFunctions();
         for (int j = 0; j < aggrFunctions.length; j++) {
@@ -218,7 +219,7 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
     planStr.addDetail("out schema:").appendDetail(getOutSchema().toString());
     planStr.addDetail("in schema:").appendDetail(getInSchema().toString());
 
-    for (GroupbyNode eachNode: groupByNodes) {
+    for (GroupbyNode eachNode: subGroupbyPlan) {
       planStr.addDetail("\t").appendDetail("distinct: " + eachNode.isDistinct())
           .appendDetail(", " + eachNode.getShortPlanString());
     }
@@ -236,7 +237,7 @@ public class DistinctGroupbyNode extends UnaryNode implements Projectable, Clone
         }
       }
     }
-    for (GroupbyNode eachGroupbyNode: groupByNodes) {
+    for (GroupbyNode eachGroupbyNode: subGroupbyPlan) {
       if (eachGroupbyNode.getGroupingColumns() != null && eachGroupbyNode.getGroupingColumns().length > 0) {
         for (Column eachColumn: eachGroupbyNode.getGroupingColumns()) {
           if (!shuffleKeyColumns.contains(eachColumn)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DropDatabaseNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DropDatabaseNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DropDatabaseNode.java
index b88c384..c566bf5 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DropDatabaseNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DropDatabaseNode.java
@@ -24,12 +24,22 @@ import org.apache.tajo.plan.PlanString;
 
 public class DropDatabaseNode extends LogicalNode implements Cloneable {
   @Expose private String databaseName;
-  @Expose private boolean ifExists;
+  @Expose private boolean ifExists = false;
 
   public DropDatabaseNode(int pid) {
     super(pid, NodeType.DROP_DATABASE);
   }
 
+  @Override
+  public int childNum() {
+    return 0;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    return null;
+  }
+
   public void init(String databaseName, boolean ifExists) {
     this.databaseName = databaseName;
     this.ifExists = ifExists;
@@ -55,7 +65,10 @@ public class DropDatabaseNode extends LogicalNode implements Cloneable {
   public boolean equals(Object obj) {
     if (obj instanceof DropDatabaseNode) {
       DropDatabaseNode other = (DropDatabaseNode) obj;
-      return super.equals(other) && this.databaseName.equals(other.databaseName) && ifExists == other.ifExists;
+      boolean eq = super.equals(other);
+      eq &= this.databaseName.equals(other.databaseName);
+      eq &= ifExists == other.ifExists;
+      return eq;
     } else {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DropTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DropTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DropTableNode.java
index 1a61852..5bde21b 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DropTableNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/DropTableNode.java
@@ -30,6 +30,16 @@ public class DropTableNode extends LogicalNode implements Cloneable {
     super(pid, NodeType.DROP_TABLE);
   }
 
+  @Override
+  public int childNum() {
+    return 0;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    return null;
+  }
+
   public void init(String tableName, boolean ifExists, boolean purge) {
     this.tableName = tableName;
     this.ifExists = ifExists;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/EvalExprNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/EvalExprNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/EvalExprNode.java
index 2519165..0f96575 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/EvalExprNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/EvalExprNode.java
@@ -35,6 +35,16 @@ public class EvalExprNode extends LogicalNode implements Projectable {
   }
 
   @Override
+  public int childNum() {
+    return 0;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    return null;
+  }
+
+  @Override
   public boolean hasTargets() {
     return true;
   }
@@ -42,7 +52,7 @@ public class EvalExprNode extends LogicalNode implements Projectable {
   @Override
   public void setTargets(Target[] targets) {
     this.exprs = targets;
-    setOutSchema(PlannerUtil.targetToSchema(targets));
+    this.setOutSchema(PlannerUtil.targetToSchema(targets));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/GroupbyNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/GroupbyNode.java
index 2c74ce3..4a18cb4 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/GroupbyNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/GroupbyNode.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.plan.logical;
 
+import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.plan.PlanString;
@@ -27,10 +28,10 @@ import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
 import org.apache.tajo.util.TUtil;
 
 public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
-	/** Grouping key sets */
-  @Expose private Column [] groupingColumns;
+  /** Grouping key sets */
+  @Expose private Column [] groupingKeys = PlannerUtil.EMPTY_COLUMNS;
   /** Aggregation Functions */
-  @Expose private AggregationFunctionCallEval [] aggrFunctions;
+  @Expose private AggregationFunctionCallEval [] aggrFunctions = PlannerUtil.EMPTY_AGG_FUNCS;
   /**
    * It's a list of targets. The grouping columns should be followed by aggregation functions.
    * aggrFunctions keep actual aggregation functions, but it only contains field references.
@@ -42,16 +43,20 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
     super(pid, NodeType.GROUP_BY);
   }
 
+  public int groupingKeyNum() {
+    return groupingKeys.length;
+  }
+
   public final boolean isEmptyGrouping() {
-    return groupingColumns == null || groupingColumns.length == 0;
+    return groupingKeys.length == 0;
   }
 
-  public void setGroupingColumns(Column [] groupingColumns) {
-    this.groupingColumns = groupingColumns;
+  public void setGroupingColumns(Column [] groupingKeys) {
+    this.groupingKeys = groupingKeys;
   }
 
 	public final Column [] getGroupingColumns() {
-	  return this.groupingColumns;
+	  return this.groupingKeys;
 	}
 
   public final boolean isDistinct() {
@@ -63,7 +68,11 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   }
 
   public boolean hasAggFunctions() {
-    return this.aggrFunctions != null;
+    return aggrFunctions.length > 0;
+  }
+
+  public int aggregationFunctionNum() {
+    return this.aggrFunctions.length;
   }
 
   public AggregationFunctionCallEval[] getAggFunctions() {
@@ -71,6 +80,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   }
 
   public void setAggFunctions(AggregationFunctionCallEval[] evals) {
+    Preconditions.checkNotNull(evals);
     this.aggrFunctions = evals;
   }
 
@@ -96,8 +106,8 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   
   public String toString() {
     StringBuilder sb = new StringBuilder("GroupBy (");
-    if (groupingColumns != null || groupingColumns.length > 0) {
-      sb.append("grouping set=").append(TUtil.arrayToString(groupingColumns));
+    if (groupingKeys != null || groupingKeys.length > 0) {
+      sb.append("grouping set=").append(TUtil.arrayToString(groupingKeys));
       sb.append(", ");
     }
     if (hasAggFunctions()) {
@@ -112,7 +122,8 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
     if (obj instanceof GroupbyNode) {
       GroupbyNode other = (GroupbyNode) obj;
       boolean eq = super.equals(other);
-      eq = eq && TUtil.checkEquals(groupingColumns, other.groupingColumns);
+      eq = eq && isDistinct() == other.isDistinct();
+      eq = eq && TUtil.checkEquals(groupingKeys, other.groupingKeys);
       eq = eq && TUtil.checkEquals(aggrFunctions, other.aggrFunctions);
       eq = eq && TUtil.checkEquals(targets, other.targets);
       return eq;
@@ -124,10 +135,10 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   @Override
   public Object clone() throws CloneNotSupportedException {
     GroupbyNode grp = (GroupbyNode) super.clone();
-    if (groupingColumns != null) {
-      grp.groupingColumns = new Column[groupingColumns.length];
-      for (int i = 0; i < groupingColumns.length; i++) {
-        grp.groupingColumns[i] = groupingColumns[i];
+    if (groupingKeys != null) {
+      grp.groupingKeys = new Column[groupingKeys.length];
+      for (int i = 0; i < groupingKeys.length; i++) {
+        grp.groupingKeys[i] = groupingKeys[i];
       }
     }
 
@@ -151,7 +162,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   public String getShortPlanString() {
     StringBuilder sb = new StringBuilder();
     sb.append(getType().name() + "(" + getPID() + ")").append("(");
-    Column [] groupingColumns = this.groupingColumns;
+    Column [] groupingColumns = this.groupingKeys;
     for (int j = 0; j < groupingColumns.length; j++) {
       sb.append(groupingColumns[j].getSimpleName());
       if(j < groupingColumns.length - 1) {
@@ -196,7 +207,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
 
     StringBuilder sb = new StringBuilder();
     sb.append("(");
-    Column [] groupingColumns = this.groupingColumns;
+    Column [] groupingColumns = this.groupingKeys;
     for (int j = 0; j < groupingColumns.length; j++) {
       sb.append(groupingColumns[j].getSimpleName());
       if(j < groupingColumns.length - 1) {
@@ -243,7 +254,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
    * If so, it returns TRUE. Otherwise, it returns FALSE.
    */
   public boolean isAggregationColumn(String simpleName) {
-    for (int i = groupingColumns.length; i < targets.length; i++) {
+    for (int i = groupingKeys.length; i < targets.length; i++) {
       if (simpleName.equals(targets[i].getNamedColumn().getSimpleName()) ||
           simpleName.equals(targets[i].getAlias())) {
         return true;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
index d1d8582..769cb59 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/InsertNode.java
@@ -95,6 +95,10 @@ public class InsertNode extends StoreTableNode implements Cloneable {
     this.targetSchema = schema;
   }
 
+  public boolean hasProjectedSchema() {
+    return this.projectedSchema != null;
+  }
+
   public Schema getProjectedSchema() {
     return this.projectedSchema;
   }
@@ -123,11 +127,12 @@ public class InsertNode extends StoreTableNode implements Cloneable {
   public boolean equals(Object obj) {
     if (obj instanceof InsertNode) {
       InsertNode other = (InsertNode) obj;
-      return super.equals(other)
-          && this.overwrite == other.overwrite
-          && TUtil.checkEquals(this.tableSchema, other.tableSchema)
-          && TUtil.checkEquals(this.targetSchema, other.targetSchema)
-          && TUtil.checkEquals(path, other.path);
+      boolean eq = super.equals(other);
+      eq &= this.overwrite == other.overwrite;
+      eq &= TUtil.checkEquals(this.tableSchema, other.tableSchema);
+      eq &= TUtil.checkEquals(this.targetSchema, other.targetSchema);
+      eq &= TUtil.checkEquals(path, other.path);
+      return eq;
     } else {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/LogicalNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/LogicalNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/LogicalNode.java
index c42a05e..200977b 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/LogicalNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/LogicalNode.java
@@ -30,24 +30,24 @@ import org.apache.tajo.plan.serder.PlanGsonHelper;
 import org.apache.tajo.util.TUtil;
 
 public abstract class LogicalNode implements Cloneable, GsonObject {
-  @Expose private int pid;
+  @Expose private int nodeId;
   @Expose private NodeType type;
 	@Expose private Schema inputSchema;
 	@Expose	private Schema outputSchema;
 
 	@Expose	private double cost = 0;
 
-	protected LogicalNode(int pid, NodeType type) {
-    this.pid = pid;
+	protected LogicalNode(int nodeId, NodeType type) {
+    this.nodeId = nodeId;
     this.type = type;
 	}
 
   public int getPID() {
-    return pid;
+    return nodeId;
   }
 
   public void setPID(int pid) {
-    this.pid = pid;
+    this.nodeId = pid;
   }
 	
 	public NodeType getType() {
@@ -58,6 +58,10 @@ public abstract class LogicalNode implements Cloneable, GsonObject {
 		this.type = type;
 	}
 
+  public abstract int childNum();
+
+  public abstract LogicalNode getChild(int idx);
+
 	public double getCost() {
 		return this.cost;
 	}
@@ -105,7 +109,7 @@ public abstract class LogicalNode implements Cloneable, GsonObject {
 	@Override
 	public Object clone() throws CloneNotSupportedException {
 	  LogicalNode node = (LogicalNode)super.clone();
-    node.pid = pid;
+    node.nodeId = nodeId;
 	  node.type = type;
 	  node.inputSchema =  (Schema) (inputSchema != null ? inputSchema.clone() : null);
 	  node.outputSchema = (Schema) (outputSchema != null ? outputSchema.clone() : null);

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/NodeType.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/NodeType.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/NodeType.java
index 9f01de9..75ae3b7 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/NodeType.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/NodeType.java
@@ -32,22 +32,22 @@ public enum NodeType {
   EXPRS(EvalExprNode.class),
   PROJECTION(ProjectionNode.class),
   LIMIT(LimitNode.class),
+  WINDOW_AGG(WindowAggNode.class),
   SORT(SortNode.class),
   HAVING(HavingNode.class),
+  DISTINCT_GROUP_BY(DistinctGroupbyNode.class),
   GROUP_BY(GroupbyNode.class),
-  WINDOW_AGG(WindowAggNode.class),
   SELECTION(SelectionNode.class),
   JOIN(JoinNode.class),
   UNION(UnionNode.class),
-  EXCEPT(ExceptNode.class),
   INTERSECT(IntersectNode.class),
+  EXCEPT(ExceptNode.class),
   TABLE_SUBQUERY(TableSubQueryNode.class),
   SCAN(ScanNode.class),
   PARTITIONS_SCAN(PartitionedTableScanNode.class),
   BST_INDEX_SCAN(IndexScanNode.class),
   STORE(StoreTableNode.class),
   INSERT(InsertNode.class),
-  DISTINCT_GROUP_BY(DistinctGroupbyNode.class),
 
   CREATE_DATABASE(CreateDatabaseNode.class),
   DROP_DATABASE(DropDatabaseNode.class),

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ProjectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ProjectionNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ProjectionNode.java
index 4ef7e2d..c0b5953 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ProjectionNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ProjectionNode.java
@@ -25,16 +25,26 @@ import org.apache.tajo.plan.Target;
 import org.apache.tajo.util.TUtil;
 
 public class ProjectionNode extends UnaryNode implements Projectable {
+
+  @Expose private boolean distinct = false;
   /**
    * the targets are always filled even if the query is 'select *'
    */
   @Expose	private Target [] targets;
-  @Expose private boolean distinct = false;
 
 	public ProjectionNode(int pid) {
 		super(pid, NodeType.PROJECTION);
 	}
 
+  public void init(boolean distinct, Target [] targets) {
+    this.distinct = distinct;
+    this.targets = targets;
+  }
+
+  public boolean isDistinct() {
+    return distinct;
+  }
+
   public boolean hasTargets() {
     return this.targets != null;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/RelationNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/RelationNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/RelationNode.java
index fd8e937..7e335b0 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/RelationNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/RelationNode.java
@@ -45,5 +45,5 @@ public abstract class RelationNode extends LogicalNode {
 
   public abstract String getCanonicalName();
 
-  public abstract Schema getTableSchema();
+  public abstract Schema getLogicalSchema();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
index 3e4abe3..a22f592 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/ScanNode.java
@@ -42,6 +42,16 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod
     super(pid, nodeType);
   }
 
+  @Override
+  public int childNum() {
+    return 0;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    return null;
+  }
+
   public ScanNode(int pid) {
     super(pid, NodeType.SCAN);
   }
@@ -101,8 +111,7 @@ public class ScanNode extends RelationNode implements Projectable, SelectableNod
     }
   }
 
-  @Override
-  public Schema getTableSchema() {
+  public Schema getLogicalSchema() {
     return logicalSchema;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SetSessionNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SetSessionNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SetSessionNode.java
index ba5f83e..117315f 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SetSessionNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/SetSessionNode.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.plan.logical;
 
 import com.google.gson.annotations.Expose;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.PlanString;
 
 public class SetSessionNode extends LogicalNode {
@@ -29,6 +30,13 @@ public class SetSessionNode extends LogicalNode {
     super(pid, NodeType.SET_SESSION);
   }
 
+  /**
+   * If both name and value are given, it will set a session variable.
+   * If a name is only given, it will unset a session variable.
+   *
+   * @param name Session variable name
+   * @param value Session variable value
+   */
   public void init(String name, String value) {
     this.name = name;
     this.value = value;
@@ -38,8 +46,8 @@ public class SetSessionNode extends LogicalNode {
     return name;
   }
 
-  public boolean isDefaultValue() {
-    return value == null;
+  public boolean hasValue() {
+    return value != null;
   }
 
   public String getValue() {
@@ -47,6 +55,16 @@ public class SetSessionNode extends LogicalNode {
   }
 
   @Override
+  public int childNum() {
+    return 0;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    throw new UnsupportedException();
+  }
+
+  @Override
   public void preOrder(LogicalNodeVisitor visitor) {
     visitor.visit(this);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/StoreTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/StoreTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/StoreTableNode.java
index 0623d21..730eb35 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/StoreTableNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/StoreTableNode.java
@@ -39,6 +39,15 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
     return tableName != null;
   }
 
+  /**
+   * Check if a table name is specified.
+   *
+   * @return FALSE if this node is used for 'INSERT INTO LOCATION'. Otherwise, it will be TRUE.
+   */
+  public boolean hasTableName() {
+    return tableName != null;
+  }
+
   public void setTableName(String tableName) {
     this.tableName = tableName;
   }
@@ -73,7 +82,7 @@ public class StoreTableNode extends PersistentStoreNode implements Cloneable {
     if (obj instanceof StoreTableNode) {
       StoreTableNode other = (StoreTableNode) obj;
       boolean eq = super.equals(other);
-      eq = eq && this.tableName.equals(other.tableName);
+      eq = eq && TUtil.checkEquals(this.tableName, other.tableName);
       eq = eq && TUtil.checkEquals(partitionDesc, other.partitionDesc);
       return eq;
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java
index 4e5f41c..4e9bd5c 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TableSubQueryNode.java
@@ -35,6 +35,16 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
     super(pid, NodeType.TABLE_SUBQUERY);
   }
 
+  @Override
+  public int childNum() {
+    return 1;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    return subQuery;
+  }
+
   public void init(String tableName, LogicalNode subQuery) {
     this.tableName = tableName;
     if (subQuery != null) {
@@ -66,7 +76,7 @@ public class TableSubQueryNode extends RelationNode implements Projectable {
   }
 
   @Override
-  public Schema getTableSchema() {
+  public Schema getLogicalSchema() {
     // an output schema can be determined by targets. So, an input schema of
     // TableSubQueryNode is only eligible for table schema.
     //

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TruncateTableNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TruncateTableNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TruncateTableNode.java
index 10c65b6..0166ef8 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TruncateTableNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/TruncateTableNode.java
@@ -32,6 +32,16 @@ public class TruncateTableNode extends LogicalNode {
     super(pid, NodeType.TRUNCATE_TABLE);
   }
 
+  @Override
+  public int childNum() {
+    return 0;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    return null;
+  }
+
   public List<String> getTableNames() {
     return tableNames;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/UnaryNode.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/UnaryNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/UnaryNode.java
index 0fc5c37..16a7f1b 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/UnaryNode.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/UnaryNode.java
@@ -31,6 +31,20 @@ public abstract class UnaryNode extends LogicalNode implements Cloneable {
 	public UnaryNode(int pid, NodeType type) {
 		super(pid, type);
 	}
+
+  @Override
+  public int childNum() {
+    return 1;
+  }
+
+  @Override
+  public LogicalNode getChild(int idx) {
+    if (idx == 0) {
+      return child;
+    } else {
+      throw new ArrayIndexOutOfBoundsException(idx);
+    }
+  }
 	
 	public void setChild(LogicalNode subNode) {
 		this.child = subNode;

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/logical/WindowSpec.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/WindowSpec.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/WindowSpec.java
index 73f4e13..cdae68f 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/WindowSpec.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/WindowSpec.java
@@ -73,7 +73,7 @@ public class WindowSpec {
     return Objects.hashCode(windowName, partitionKeys, windowFrame);
   }
 
-  public static class WindowFrame {
+  public static class WindowFrame implements Cloneable {
     @Expose private WindowStartBound startBound;
     @Expose private WindowEndBound endBound;
     @Expose org.apache.tajo.algebra.WindowSpec.WindowFrameUnit unit; // TODO - to be supported
@@ -83,12 +83,8 @@ public class WindowSpec {
       this.endBound = new WindowEndBound(WindowFrameEndBoundType.UNBOUNDED_FOLLOWING);
     }
 
-    public WindowFrame(WindowStartBound startBound) {
-      this.startBound = startBound;
-    }
-
     public WindowFrame(WindowStartBound startBound, WindowEndBound endBound) {
-      this(startBound);
+      this.startBound = startBound;
       this.endBound = endBound;
     }
 
@@ -120,21 +116,29 @@ public class WindowSpec {
     public boolean equals(Object obj) {
       if (obj instanceof WindowFrame) {
         WindowFrame another = (WindowFrame) obj;
-        return
-            TUtil.checkEquals(startBound, another.startBound) &&
-            TUtil.checkEquals(endBound, another.endBound) &&
-            TUtil.checkEquals(unit, another.unit);
+        boolean eq = TUtil.checkEquals(startBound, another.startBound);
+        eq &= TUtil.checkEquals(endBound, another.endBound);
+        eq &= TUtil.checkEquals(unit, another.unit);
+        return eq;
       } else {
         return false;
       }
     }
 
+    public WindowFrame clone() throws CloneNotSupportedException {
+      WindowFrame newFrame = (WindowFrame) super.clone();
+      newFrame.startBound = startBound.clone();
+      newFrame.endBound = endBound.clone();
+      newFrame.unit = unit;
+      return newFrame;
+    }
+
     public int hashCode() {
       return Objects.hashCode(startBound, endBound, unit);
     }
   }
 
-  public static class WindowStartBound {
+  public static class WindowStartBound implements Cloneable {
     @Expose private WindowFrameStartBoundType boundType;
     @Expose private EvalNode number;
 
@@ -158,7 +162,9 @@ public class WindowSpec {
     public boolean equals(Object obj) {
       if (obj instanceof WindowStartBound) {
         WindowStartBound other = (WindowStartBound) obj;
-        return boundType == other.boundType && number.equals(other.number);
+        boolean eq = boundType == other.boundType;
+        eq &= TUtil.checkEquals(number, other.number);
+        return eq;
       } else {
         return false;
       }
@@ -168,9 +174,19 @@ public class WindowSpec {
     public int hashCode() {
       return Objects.hashCode(boundType, number);
     }
+
+    @Override
+    public WindowStartBound clone() throws CloneNotSupportedException {
+      WindowStartBound newStartBound = (WindowStartBound) super.clone();
+      newStartBound.boundType = boundType;
+      if (number != null) {
+        newStartBound.number = (EvalNode) number.clone();
+      }
+      return newStartBound;
+    }
   }
 
-  public static class WindowEndBound {
+  public static class WindowEndBound implements Cloneable {
     @Expose private WindowFrameEndBoundType boundType;
     @Expose private EvalNode number;
 
@@ -192,9 +208,11 @@ public class WindowSpec {
 
     @Override
     public boolean equals(Object obj) {
-      if (obj instanceof WindowStartBound) {
+      if (obj instanceof WindowEndBound) {
         WindowEndBound other = (WindowEndBound) obj;
-        return boundType == other.boundType && number.equals(other.number);
+        boolean eq = boundType == other.boundType;
+        eq &= TUtil.checkEquals(number, other.number);
+        return eq;
       } else {
         return false;
       }
@@ -204,5 +222,14 @@ public class WindowSpec {
     public int hashCode() {
       return Objects.hashCode(boundType, number);
     }
+
+    public WindowEndBound clone() throws CloneNotSupportedException {
+      WindowEndBound newEndBound = (WindowEndBound) super.clone();
+      newEndBound.boundType = boundType;
+      if (number != null) {
+        newEndBound.number = (EvalNode) number.clone();
+      }
+      return newEndBound;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
index 51a016f..44d3263 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/NameResolver.java
@@ -128,7 +128,7 @@ public abstract class NameResolver {
             CatalogUtil.buildFQName(relationOp.getCanonicalName(), CatalogUtil.extractSimpleName(canonicalName));
       }
 
-      Schema schema = relationOp.getTableSchema();
+      Schema schema = relationOp.getLogicalSchema();
       Column column = schema.getColumn(canonicalName);
 
       return column;
@@ -173,7 +173,7 @@ public abstract class NameResolver {
     List<Column> candidates = TUtil.newList();
 
     for (RelationNode rel : block.getRelations()) {
-      Column found = rel.getTableSchema().getColumn(columnRef.getName());
+      Column found = rel.getLogicalSchema().getColumn(columnRef.getName());
       if (found != null) {
         candidates.add(found);
       }
@@ -201,7 +201,7 @@ public abstract class NameResolver {
     for (LogicalPlan.QueryBlock eachBlock : plan.getQueryBlocks()) {
 
       for (RelationNode rel : eachBlock.getRelations()) {
-        Column found = rel.getTableSchema().getColumn(columnRef.getName());
+        Column found = rel.getLogicalSchema().getColumn(columnRef.getName());
         if (found != null) {
           candidates.add(found);
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java
index a1d9dbd..19f39dd 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/nameresolver/ResolverByLegacy.java
@@ -74,7 +74,7 @@ public class ResolverByLegacy extends NameResolver {
     Schema currentNodeSchema = null;
     if (currentNode != null) {
       if (currentNode instanceof RelationNode) {
-        currentNodeSchema = ((RelationNode) currentNode).getTableSchema();
+        currentNodeSchema = ((RelationNode) currentNode).getLogicalSchema();
       } else {
         currentNodeSchema = currentNode.getInSchema();
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteEngine.java
new file mode 100644
index 0000000..19c254b
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteEngine.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * This is a basic query rewrite rule engine. This rewrite rule engine
+ * rewrites a logical plan with various query rewrite rules.
+ */
+public class BaseLogicalPlanRewriteEngine implements LogicalPlanRewriteEngine {
+  /** class logger */
+  private Log LOG = LogFactory.getLog(BaseLogicalPlanRewriteEngine.class);
+
+  /** a map for query rewrite rules  */
+  private Map<String, LogicalPlanRewriteRule> rewriteRules = new LinkedHashMap<String, LogicalPlanRewriteRule>();
+
+  /**
+   * Add a query rewrite rule to this engine.
+   *
+   * @param rules Rule classes
+   */
+  public void addRewriteRule(Iterable<Class<? extends LogicalPlanRewriteRule>> rules) {
+    for (Class<? extends LogicalPlanRewriteRule> clazz : rules) {
+      try {
+        LogicalPlanRewriteRule rule = clazz.newInstance();
+        addRewriteRule(rule);
+      } catch (Throwable t) {
+        throw new RuntimeException(t);
+      }
+    }
+  }
+
+  /**
+   * Add a query rewrite rule to this engine.
+   *
+   * @param rule The rule to be added to this engine.
+   */
+  public void addRewriteRule(LogicalPlanRewriteRule rule) {
+    if (!rewriteRules.containsKey(rule.getName())) {
+      rewriteRules.put(rule.getName(), rule);
+    }
+  }
+
+  /**
+   * Rewrite a logical plan with all query rewrite rules added to this engine.
+   *
+   * @param plan The plan to be rewritten with all query rewrite rule.
+   * @return The rewritten plan.
+   */
+  public LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException {
+    LogicalPlanRewriteRule rule;
+    for (Entry<String, LogicalPlanRewriteRule> rewriteRule : rewriteRules.entrySet()) {
+      rule = rewriteRule.getValue();
+      if (rule.isEligible(queryContext, plan)) {
+        plan = rule.rewrite(queryContext, plan);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The rule \"" + rule.getName() + " \" rewrites the query.");
+        }
+      }
+    }
+
+    return plan;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java
new file mode 100644
index 0000000..eb96149
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BaseLogicalPlanRewriteRuleProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.rewrite.rules.FilterPushDownRule;
+import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
+import org.apache.tajo.plan.rewrite.rules.ProjectionPushDownRule;
+import org.apache.tajo.util.TUtil;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Default RewriteRuleProvider
+ */
+@SuppressWarnings("unused")
+public class BaseLogicalPlanRewriteRuleProvider extends LogicalPlanRewriteRuleProvider {
+
+  public BaseLogicalPlanRewriteRuleProvider(TajoConf conf) {
+    super(conf);
+  }
+
+  @Override
+  public Collection<Class<? extends LogicalPlanRewriteRule>> getPreRules() {
+    List<Class<? extends LogicalPlanRewriteRule>> rules = TUtil.newList();
+
+    if (systemConf.getBoolVar(TajoConf.ConfVars.$TEST_FILTER_PUSHDOWN_ENABLED)) {
+      rules.add(FilterPushDownRule.class);
+    }
+
+    return rules;
+  }
+
+  @Override
+  public Collection<Class<? extends LogicalPlanRewriteRule>> getPostRules() {
+    List<Class<? extends LogicalPlanRewriteRule>> rules = TUtil.newList(
+        ProjectionPushDownRule.class,
+        PartitionedTableRewriter.class
+    );
+    return rules;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java
deleted file mode 100644
index 491dda1..0000000
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/BasicQueryRewriteEngine.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.plan.rewrite;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * This is a basic query rewrite rule engine. This rewrite rule engine
- * rewrites a logical plan with various query rewrite rules.
- */
-public class BasicQueryRewriteEngine implements QueryRewriteEngine {
-  /** class logger */
-  private Log LOG = LogFactory.getLog(BasicQueryRewriteEngine.class);
-
-  /** a map for query rewrite rules  */
-  private Map<String, RewriteRule> rewriteRules = new LinkedHashMap<String, RewriteRule>();
-
-  /**
-   * Add a query rewrite rule to this engine.
-   *
-   * @param rule The rule to be added to this engine.
-   */
-  public void addRewriteRule(RewriteRule rule) {
-    if (!rewriteRules.containsKey(rule.getName())) {
-      rewriteRules.put(rule.getName(), rule);
-    }
-  }
-
-  /**
-   * Rewrite a logical plan with all query rewrite rules added to this engine.
-   *
-   * @param plan The plan to be rewritten with all query rewrite rule.
-   * @return The rewritten plan.
-   */
-  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
-    RewriteRule rule;
-    for (Entry<String, RewriteRule> rewriteRule : rewriteRules.entrySet()) {
-      rule = rewriteRule.getValue();
-      if (rule.isEligible(plan)) {
-        plan = rule.rewrite(plan);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("The rule \"" + rule.getName() + " \" rewrites the query.");
-        }
-      }
-    }
-
-    return plan;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteEngine.java
new file mode 100644
index 0000000..267d651
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteEngine.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+public interface LogicalPlanRewriteEngine {
+  /**
+   * Rewrite a logical plan with all query rewrite rules added to this engine.
+   *
+   * @param plan The plan to be rewritten with all query rewrite rule.
+   * @return The rewritten plan.
+   */
+  LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRule.java
new file mode 100644
index 0000000..2f0652b
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRule.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.plan.LogicalPlan;
+import org.apache.tajo.plan.PlanningException;
+
+/**
+ * An interface for a rewrite rule.
+ */
+public interface LogicalPlanRewriteRule {
+
+  /**
+   * It returns the rewrite rule name. It will be used for debugging and
+   * building a optimization history.
+   *
+   * @return The rewrite rule name
+   */
+  String getName();
+
+  /**
+   * This method checks if this rewrite rule can be applied to a given query plan.
+   * For example, the selection push down can not be applied to the query plan without any filter.
+   * In such case, it will return false.
+   *
+   * @param plan The plan to be checked
+   * @return True if this rule can be applied to a given plan. Otherwise, false.
+   */
+  boolean isEligible(OverridableConf queryContext, LogicalPlan plan);
+
+  /**
+   * Updates a logical plan and returns an updated logical plan rewritten by this rule.
+   * It must be guaranteed that the input logical plan is not modified even after rewrite.
+   * In other words, the rewrite has to modify an plan copied from the input plan.
+   *
+   * @param plan Input logical plan. It will not be modified.
+   * @return The rewritten logical plan.
+   */
+  LogicalPlan rewrite(OverridableConf queryContext, LogicalPlan plan) throws PlanningException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRuleProvider.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRuleProvider.java
new file mode 100644
index 0000000..934549e
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanRewriteRuleProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import org.apache.tajo.conf.TajoConf;
+
+import java.util.Collection;
+
+public abstract class LogicalPlanRewriteRuleProvider {
+  protected final TajoConf systemConf;
+
+  public LogicalPlanRewriteRuleProvider(TajoConf systemConf) {
+    this.systemConf = systemConf;
+  }
+
+  /**
+   * It returns RewriteRule classes which should be executed before join ordering.
+   *
+   * @return RewriteRule classes
+   */
+  public abstract Collection<Class<? extends LogicalPlanRewriteRule>> getPreRules();
+  /**
+   * It returns RewriteRule classes which should be executed after join ordering.
+   *
+   * @return RewriteRule classes
+   */
+  public abstract Collection<Class<? extends LogicalPlanRewriteRule>> getPostRules();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanTestRuleProvider.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanTestRuleProvider.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanTestRuleProvider.java
new file mode 100644
index 0000000..704e7ed
--- /dev/null
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/LogicalPlanTestRuleProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.plan.rewrite;
+
+import com.google.common.collect.Lists;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.rewrite.rules.LogicalPlanEqualityTester;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * It is used only for testing.
+ */
+@SuppressWarnings("unused")
+public class LogicalPlanTestRuleProvider extends BaseLogicalPlanRewriteRuleProvider {
+
+  public LogicalPlanTestRuleProvider(TajoConf conf) {
+    super(conf);
+  }
+
+  @Override
+  public Collection<Class<? extends LogicalPlanRewriteRule>> getPostRules() {
+    List<Class<? extends LogicalPlanRewriteRule>> injectedRules = Lists.newArrayList(super.getPostRules());
+    injectedRules.add(LogicalPlanEqualityTester.class);
+    return injectedRules;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/32be38d4/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java
deleted file mode 100644
index b7f5637..0000000
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/QueryRewriteEngine.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.plan.rewrite;
-
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
-
-public interface QueryRewriteEngine {
-  /**
-   * Rewrite a logical plan with all query rewrite rules added to this engine.
-   *
-   * @param plan The plan to be rewritten with all query rewrite rule.
-   * @return The rewritten plan.
-   */
-  LogicalPlan rewrite(LogicalPlan plan) throws PlanningException;
-}