You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/25 07:35:01 UTC

[1/3] TAJO-539: Change some EvalNode::eval to directly return a Datum value.

Updated Branches:
  refs/heads/master 0386268e8 -> e23e78ccd


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
index 6264961..e70e6c2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/ProjectionPushDownRule.java
@@ -23,10 +23,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.SchemaUtil;
@@ -412,12 +409,19 @@ public class ProjectionPushDownRule extends
       }
     }
 
-    // Getting target names
-    final int targetNum = node.getTargets().length;
-    final String [] targetNames = new String[targetNum];
-    for (int i = 0; i < targetNum; i++) {
-      Target target = node.getTargets()[i];
-      targetNames[i] = newContext.addExpr(target);
+    // Getting eval names
+
+    final String [] aggEvalNames;
+    if (node.hasAggFunctions()) {
+      final int evalNum = node.getAggFunctions().length;
+      aggEvalNames = new String[evalNum];
+      for (int evalIdx = 0, targetIdx = groupingKeyNum; targetIdx < node.getTargets().length; evalIdx++, targetIdx++) {
+        Target target = node.getTargets()[targetIdx];
+        EvalNode evalNode = node.getAggFunctions()[evalIdx];
+        aggEvalNames[evalIdx] = newContext.addExpr(new Target(evalNode, target.getCanonicalName()));
+      }
+    } else {
+      aggEvalNames = null;
     }
 
     // visit a child node
@@ -444,23 +448,50 @@ public class ProjectionPushDownRule extends
     }
 
     // Getting projected targets
-    List<Target> projectedTargets = TUtil.newList();
-    for (Iterator<String> it = getFilteredReferences(targetNames, context.requiredSet); it.hasNext();) {
-      String referenceName = it.next();
-      Target target = context.targetListMgr.getTarget(referenceName);
+    if (node.hasAggFunctions()) {
+      AggregationFunctionCallEval [] aggEvals = new AggregationFunctionCallEval[aggEvalNames.length];
+      int i = 0;
+      for (Iterator<String> it = getFilteredReferences(aggEvalNames, TUtil.newList(aggEvalNames)); it.hasNext();) {
 
-      if (context.targetListMgr.isResolved(referenceName)) {
-        projectedTargets.add(new Target(new FieldEval(target.getNamedColumn())));
-      } else if (LogicalPlanner.checkIfBeEvaluatedAtGroupBy(target.getEvalTree(), node)) {
-        projectedTargets.add(target);
-        context.targetListMgr.resolve(target);
+        String referenceName = it.next();
+        Target target = context.targetListMgr.getTarget(referenceName);
+
+        if (LogicalPlanner.checkIfBeEvaluatedAtGroupBy(target.getEvalTree(), node)) {
+          aggEvals[i++] = target.getEvalTree();
+          context.targetListMgr.resolve(target);
+        }
+      }
+      if (aggEvals.length > 0) {
+        node.setAggFunctions(aggEvals);
       }
     }
     node.setInSchema(child.getOutSchema());
-    node.setTargets(projectedTargets.toArray(new Target[projectedTargets.size()]));
+    Target [] targets = buildGroupByTarget(node, aggEvalNames);
+    node.setTargets(targets);
+
     return node;
   }
 
+  public static Target [] buildGroupByTarget(GroupbyNode groupbyNode, String [] aggEvalNames) {
+    final int groupingKeyNum = groupbyNode.getGroupingColumns().length;
+    final int aggrFuncNum = aggEvalNames != null ? aggEvalNames.length : 0;
+    EvalNode [] aggEvalNodes = groupbyNode.getAggFunctions();
+    Target [] targets = new Target[groupingKeyNum + aggrFuncNum];
+
+    for (int groupingKeyIdx = 0; groupingKeyIdx < groupingKeyNum; groupingKeyIdx++) {
+      targets[groupingKeyIdx] = new Target(new FieldEval(groupbyNode.getGroupingColumns()[groupingKeyIdx]));
+    }
+
+    if (aggEvalNames != null) {
+      for (int aggrFuncIdx = 0, targetIdx = groupingKeyNum; aggrFuncIdx < aggrFuncNum; aggrFuncIdx++, targetIdx++) {
+        targets[targetIdx] =
+            new Target(new FieldEval(aggEvalNames[aggrFuncIdx], aggEvalNodes[aggrFuncIdx].getValueType()));
+      }
+    }
+
+    return targets;
+  }
+
   public LogicalNode visitFilter(Context context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                  SelectionNode node, Stack<LogicalNode> stack) throws PlanningException {
     Context newContext = new Context(context);
@@ -548,14 +579,14 @@ public class ProjectionPushDownRule extends
     return new FilteredStringsIterator(targetNames, required);
   }
 
-  static Iterator<String> getFilteredReferences(String [] targetNames, Set<String> required) {
+  static Iterator<String> getFilteredReferences(String [] targetNames, Collection<String> required) {
     return new FilteredStringsIterator(targetNames, required);
   }
 
   static class FilteredStringsIterator implements Iterator<String> {
     Iterator<String> iterator;
 
-    FilteredStringsIterator(Collection<String> targetNames, Set<String> required) {
+    FilteredStringsIterator(Collection<String> targetNames, Collection<String> required) {
       List<String> filtered = TUtil.newList();
       for (String name : targetNames) {
         if (required.contains(name)) {
@@ -566,7 +597,7 @@ public class ProjectionPushDownRule extends
       iterator = filtered.iterator();
     }
 
-    FilteredStringsIterator(String [] targetNames, Set<String> required) {
+    FilteredStringsIterator(String [] targetNames, Collection<String> required) {
       this(TUtil.newList(targetNames), required);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
index e7eaf40..42508a0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java
@@ -32,14 +32,12 @@ import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.dataserver.HttpUtil;
 
 import java.io.UnsupportedEncodingException;
@@ -418,14 +416,12 @@ public class TupleUtil {
 
   private static class TupleBlockFilterScanner {
     private EvalNode qual;
-    private EvalContext qualCtx;
     private Iterator<Tuple> iterator;
     private Schema schema;
 
     public TupleBlockFilterScanner(Schema schema, Collection<Tuple> tuples, EvalNode qual) {
       this.schema = schema;
       this.qual = qual;
-      this.qualCtx = qual.newContext();
       this.iterator = tuples.iterator();
     }
 
@@ -435,8 +431,7 @@ public class TupleUtil {
       Tuple tuple;
       while (iterator.hasNext()) {
         tuple = iterator.next();
-        qual.eval(qualCtx, schema, tuple);
-        if (qual.terminate(qualCtx).asBool()) {
+        if (qual.eval(schema, tuple).isTrue()) {
           results.add(tuple);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
index cfe3b61..a969326 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -83,7 +83,7 @@ public class GlobalEngine extends AbstractService {
     try  {
       analyzer = new SQLAnalyzer();
       converter = new HiveConverter();
-      preVerifier = new PreLogicalPlanVerifier();
+      preVerifier = new PreLogicalPlanVerifier(context.getCatalog());
       planner = new LogicalPlanner(context.getCatalog());
       optimizer = new LogicalOptimizer(context.getConf());
       annotatedPlanVerifier = new LogicalPlanVerifier(context.getConf(), context.getCatalog());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 07655a2..299703c 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -130,13 +130,10 @@ public class ExprTestBase {
     try {
       targets = getRawTargets(query);
 
-      EvalContext [] evalContexts = new EvalContext[targets.length];
       Tuple outTuple = new VTuple(targets.length);
       for (int i = 0; i < targets.length; i++) {
         EvalNode eval = targets[i].getEvalTree();
-        evalContexts[i] = eval.newContext();
-        eval.eval(evalContexts[i], inputSchema, vtuple);
-        outTuple.put(i, eval.terminate(evalContexts[i]));
+        outTuple.put(i, eval.eval(inputSchema, vtuple));
       }
 
       for (int i = 0; i < expected.length; i++) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
index 34ba1ca..0489e37 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTree.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.engine.eval;
 
 import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -28,13 +27,6 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.engine.function.GeneralFunction;
 import org.apache.tajo.engine.json.CoreGsonHelper;
-import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PlanningException;
-import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.SelectionNode;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
@@ -43,8 +35,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-
 import static org.apache.tajo.common.TajoDataTypes.Type.*;
 import static org.junit.Assert.*;
 
@@ -136,15 +126,13 @@ public class TestEvalTree extends ExprTestBase{
     schema1.addColumn("table1.score", INT4);
     
     BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2);
-    EvalContext evalCtx = expr.newContext();
     assertCloneEqual(expr);
     VTuple tuple = new VTuple(2);
     tuple.put(0, DatumFactory.createInt4(1)); // put 0th field
     tuple.put(1, DatumFactory.createInt4(99)); // put 0th field
 
     // the result of evaluation must be 100.
-    expr.eval(evalCtx, schema1, tuple);
-    assertEquals(expr.terminate(evalCtx).asInt4(), 100);
+    assertEquals(expr.eval(schema1, tuple).asInt4(), 100);
   }
 
   public static class MockTrueEval extends EvalNode {
@@ -159,7 +147,7 @@ public class TestEvalTree extends ExprTestBase{
     }
 
     @Override
-    public Datum terminate(EvalContext ctx) {
+    public Datum eval(Schema schema, Tuple tuple) {
       return DatumFactory.createBool(true);
     }
 
@@ -169,11 +157,6 @@ public class TestEvalTree extends ExprTestBase{
     }
 
     @Override
-    public EvalContext newContext() {
-      return null;
-    }
-
-    @Override
     public DataType getValueType() {
       return CatalogUtil.newSimpleDataType(BOOLEAN);
     }
@@ -187,12 +170,7 @@ public class TestEvalTree extends ExprTestBase{
     }
 
     @Override
-    public EvalContext newContext() {
-      return null;
-    }
-
-    @Override
-    public Datum terminate(EvalContext ctx) {
+    public Datum eval(Schema schema, Tuple tuple) {
       return DatumFactory.createBool(false);
     }
 
@@ -218,24 +196,16 @@ public class TestEvalTree extends ExprTestBase{
     MockFalseExpr falseExpr = new MockFalseExpr();
 
     BinaryEval andExpr = new BinaryEval(EvalType.AND, trueExpr, trueExpr);
-    EvalContext evalCtx = andExpr.newContext();
-    andExpr.eval(evalCtx, null, null);
-    assertTrue(andExpr.terminate(evalCtx).asBool());
+    assertTrue(andExpr.eval(null, null).asBool());
 
     andExpr = new BinaryEval(EvalType.AND, falseExpr, trueExpr);
-    evalCtx = andExpr.newContext();
-    andExpr.eval(evalCtx, null, null);
-    assertFalse(andExpr.terminate(evalCtx).asBool());
+    assertFalse(andExpr.eval(null, null).asBool());
 
     andExpr = new BinaryEval(EvalType.AND, trueExpr, falseExpr);
-    evalCtx= andExpr.newContext();
-    andExpr.eval(evalCtx, null, null);
-    assertFalse(andExpr.terminate(evalCtx).asBool());
+    assertFalse(andExpr.eval(null, null).asBool());
 
     andExpr = new BinaryEval(EvalType.AND, falseExpr, falseExpr);
-    evalCtx= andExpr.newContext();
-    andExpr.eval(evalCtx, null, null);
-    assertFalse(andExpr.terminate(evalCtx).asBool());
+    assertFalse(andExpr.eval(null, null).asBool());
   }
 
   @Test
@@ -244,24 +214,16 @@ public class TestEvalTree extends ExprTestBase{
     MockFalseExpr falseExpr = new MockFalseExpr();
 
     BinaryEval orExpr = new BinaryEval(EvalType.OR, trueExpr, trueExpr);
-    EvalContext evalCtx= orExpr.newContext();
-    orExpr.eval(evalCtx, null, null);
-    assertTrue(orExpr.terminate(evalCtx).asBool());
+    assertTrue(orExpr.eval(null, null).asBool());
 
     orExpr = new BinaryEval(EvalType.OR, falseExpr, trueExpr);
-    evalCtx= orExpr.newContext();
-    orExpr.eval(evalCtx, null, null);
-    assertTrue(orExpr.terminate(evalCtx).asBool());
+    assertTrue(orExpr.eval(null, null).asBool());
 
     orExpr = new BinaryEval(EvalType.OR, trueExpr, falseExpr);
-    evalCtx= orExpr.newContext();
-    orExpr.eval(evalCtx, null, null);
-    assertTrue(orExpr.terminate(evalCtx).asBool());
+    assertTrue(orExpr.eval(null, null).asBool());
 
     orExpr = new BinaryEval(EvalType.OR, falseExpr, falseExpr);
-    evalCtx = orExpr.newContext();
-    orExpr.eval(evalCtx, null, null);
-    assertFalse(orExpr.terminate(evalCtx).asBool());
+    assertFalse(orExpr.eval(null, null).asBool());
   }
 
   @Test
@@ -274,73 +236,41 @@ public class TestEvalTree extends ExprTestBase{
     e1 = new ConstEval(DatumFactory.createInt4(9));
     e2 = new ConstEval(DatumFactory.createInt4(34));
     expr = new BinaryEval(EvalType.LTH, e1, e2);
-    EvalContext evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertTrue(expr.terminate(evalCtx).asBool());
+    assertTrue(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.LEQ, e1, e2);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertTrue(expr.terminate(evalCtx).asBool());
+    assertTrue(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.LTH, e2, e1);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertFalse(expr.terminate(evalCtx).asBool());
+    assertFalse(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.LEQ, e2, e1);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertFalse(expr.terminate(evalCtx).asBool());
+    assertFalse(expr.eval(null, null).asBool());
 
     expr = new BinaryEval(EvalType.GTH, e2, e1);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertTrue(expr.terminate(evalCtx).asBool());
+    assertTrue(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.GEQ, e2, e1);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertTrue(expr.terminate(evalCtx).asBool());
+    assertTrue(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.GTH, e1, e2);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertFalse(expr.terminate(evalCtx).asBool());
+    assertFalse(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.GEQ, e1, e2);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertFalse(expr.terminate(evalCtx).asBool());
+    assertFalse(expr.eval(null, null).asBool());
 
     BinaryEval plus = new BinaryEval(EvalType.PLUS, e1, e2);
     expr = new BinaryEval(EvalType.LTH, e1, plus);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertTrue(expr.terminate(evalCtx).asBool());
+    assertTrue(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.LEQ, e1, plus);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertTrue(expr.terminate(evalCtx).asBool());
+    assertTrue(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.LTH, plus, e1);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertFalse(expr.terminate(evalCtx).asBool());
+    assertFalse(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.LEQ, plus, e1);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertFalse(expr.terminate(evalCtx).asBool());
+    assertFalse(expr.eval(null, null).asBool());
 
     expr = new BinaryEval(EvalType.GTH, plus, e1);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertTrue(expr.terminate(evalCtx).asBool());
+    assertTrue(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.GEQ, plus, e1);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertTrue(expr.terminate(evalCtx).asBool());
+    assertTrue(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.GTH, e1, plus);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertFalse(expr.terminate(evalCtx).asBool());
+    assertFalse(expr.eval(null, null).asBool());
     expr = new BinaryEval(EvalType.GEQ, e1, plus);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertFalse(expr.terminate(evalCtx).asBool());
+    assertFalse(expr.eval(null, null).asBool());
   }
 
   @Test
@@ -353,36 +283,28 @@ public class TestEvalTree extends ExprTestBase{
     e1 = new ConstEval(DatumFactory.createInt4(9));
     e2 = new ConstEval(DatumFactory.createInt4(34));
     BinaryEval expr = new BinaryEval(EvalType.PLUS, e1, e2);
-    EvalContext evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertEquals(expr.terminate(evalCtx).asInt4(), 43);
+    assertEquals(expr.eval(null, null).asInt4(), 43);
     assertCloneEqual(expr);
     
     // MINUS
     e1 = new ConstEval(DatumFactory.createInt4(5));
     e2 = new ConstEval(DatumFactory.createInt4(2));
     expr = new BinaryEval(EvalType.MINUS, e1, e2);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertEquals(expr.terminate(evalCtx).asInt4(), 3);
+    assertEquals(expr.eval(null, null).asInt4(), 3);
     assertCloneEqual(expr);
     
     // MULTIPLY
     e1 = new ConstEval(DatumFactory.createInt4(5));
     e2 = new ConstEval(DatumFactory.createInt4(2));
     expr = new BinaryEval(EvalType.MULTIPLY, e1, e2);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertEquals(expr.terminate(evalCtx).asInt4(), 10);
+    assertEquals(expr.eval(null, null).asInt4(), 10);
     assertCloneEqual(expr);
     
     // DIVIDE
     e1 = new ConstEval(DatumFactory.createInt4(10));
     e2 = new ConstEval(DatumFactory.createInt4(5));
     expr = new BinaryEval(EvalType.DIVIDE, e1, e2);
-    evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertEquals(expr.terminate(evalCtx).asInt4(), 2);
+    assertEquals(expr.eval(null, null).asInt4(), 2);
     assertCloneEqual(expr);
   }
 
@@ -398,9 +320,7 @@ public class TestEvalTree extends ExprTestBase{
     assertEquals(CatalogUtil.newSimpleDataType(INT4), expr.getValueType());
 
     expr = new BinaryEval(EvalType.LTH, e1, e2);
-    EvalContext evalCtx = expr.newContext();
-    expr.eval(evalCtx, null, null);
-    assertTrue(expr.terminate(evalCtx).asBool());
+    assertTrue(expr.eval(null, null).asBool());
     assertEquals(CatalogUtil.newSimpleDataType(BOOLEAN), expr.getValueType());
 
     e1 = new ConstEval(DatumFactory.createFloat8(9.3));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 86c1c01..86d603e 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -219,19 +219,15 @@ public class TestEvalTreeUtil {
     EvalNode first = cnf[0];
     EvalNode second = cnf[1];
     
-    FieldEval field = (FieldEval) first.getLeftExpr();
+    FieldEval field = first.getLeftExpr();
     assertEquals(col1, field.getColumnRef());
     assertEquals(EvalType.LTH, first.getType());
-    EvalContext firstRCtx = first.getRightExpr().newContext();
-    first.getRightExpr().eval(firstRCtx, null,  null);
-    assertEquals(10, first.getRightExpr().terminate(firstRCtx).asInt4());
+    assertEquals(10, first.getRightExpr().eval(null,  null).asInt4());
     
-    field = (FieldEval) second.getRightExpr();
+    field = second.getRightExpr();
     assertEquals(col1, field.getColumnRef());
     assertEquals(EvalType.LTH, second.getType());
-    EvalContext secondLCtx = second.getLeftExpr().newContext();
-    second.getLeftExpr().eval(secondLCtx, null,  null);
-    assertEquals(4, second.getLeftExpr().terminate(secondLCtx).asInt4());
+    assertEquals(4, second.getLeftExpr().eval(null,  null).asInt4());
   }
   
   @Test
@@ -264,15 +260,11 @@ public class TestEvalTreeUtil {
   public final void testSimplify() throws PlanningException {
     Target [] targets = getRawTargets(QUERIES[0]);
     EvalNode node = AlgebraicUtil.eliminateConstantExprs(targets[0].getEvalTree());
-    EvalContext nodeCtx = node.newContext();
     assertEquals(EvalType.CONST, node.getType());
-    node.eval(nodeCtx, null, null);
-    assertEquals(7, node.terminate(nodeCtx).asInt4());
+    assertEquals(7, node.eval(null, null).asInt4());
     node = AlgebraicUtil.eliminateConstantExprs(targets[1].getEvalTree());
     assertEquals(EvalType.CONST, node.getType());
-    nodeCtx = node.newContext();
-    node.eval(nodeCtx, null, null);
-    assertTrue(7.0d == node.terminate(nodeCtx).asFloat8());
+    assertTrue(7.0d == node.eval(null, null).asFloat8());
 
     Expr expr = analyzer.parse(QUERIES[1]);
     LogicalPlan plan = planner.createPlan(expr);
@@ -300,9 +292,7 @@ public class TestEvalTreeUtil {
     assertEquals(EvalType.GTH, transposed.getType());
     FieldEval field = transposed.getLeftExpr();
     assertEquals(col1, field.getColumnRef());
-    EvalContext evalCtx = transposed.getRightExpr().newContext();
-    transposed.getRightExpr().eval(evalCtx, null, null);
-    assertEquals(1, transposed.getRightExpr().terminate(evalCtx).asInt4());
+    assertEquals(1, transposed.getRightExpr().eval(null, null).asInt4());
 
     node = getRootSelection(QUERIES[4]);
     // we expect that score < 3
@@ -310,9 +300,7 @@ public class TestEvalTreeUtil {
     assertEquals(EvalType.LTH, transposed.getType());
     field = transposed.getLeftExpr();
     assertEquals(col1, field.getColumnRef());
-    evalCtx = transposed.getRightExpr().newContext();
-    transposed.getRightExpr().eval(evalCtx, null, null);
-    assertEquals(2, transposed.getRightExpr().terminate(evalCtx).asInt4());
+    assertEquals(2, transposed.getRightExpr().eval(null, null).asInt4());
   }
 
   @Test
@@ -321,11 +309,11 @@ public class TestEvalTreeUtil {
     Expr expr = analyzer.parse(query);
     LogicalPlan plan = planner.createPlan(expr);
     GroupbyNode groupByNode = plan.getRootBlock().getNode(NodeType.GROUP_BY);
-    Target [] targets = groupByNode.getTargets();
+    EvalNode [] aggEvals = groupByNode.getAggFunctions();
 
     List<AggregationFunctionCallEval> list = new ArrayList<AggregationFunctionCallEval>();
-    for (int i = 0; i < targets.length; i++) {
-      list.addAll(EvalTreeUtil.findDistinctAggFunction(targets[i].getEvalTree()));
+    for (int i = 0; i < aggEvals.length; i++) {
+      list.addAll(EvalTreeUtil.findDistinctAggFunction(aggEvals[i]));
     }
     assertEquals(2, list.size());
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 6758189..9eb0276 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -570,7 +570,7 @@ public class TestLogicalPlanner {
 
   static final String ALIAS [] = {
     "select deptName, sum(score) as total from score group by deptName",
-    "select em.empId as id, sum(score) as total from employee as em inner join score using (em.deptName)"
+    "select em.empId as id, sum(score) as total from employee as em inner join score using (em.deptName) group by id"
   };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 85ea4f2..00ce501 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -91,28 +91,6 @@ public class TestPlannerUtil {
   }
 
   @Test
-  public final void testTransformTwoPhase() throws PlanningException {
-    // without 'having clause'
-    Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[7]);
-    LogicalNode plan = planner.createPlan(expr).getRootBlock().getRoot();
-
-    assertEquals(NodeType.ROOT, plan.getType());
-    LogicalRootNode root = (LogicalRootNode) plan;
-    TestLogicalPlanner.testQuery7(root.getChild());
-    
-    root.postOrder(new TwoPhaseBuilder());
-  }
-  
-  private final class TwoPhaseBuilder implements LogicalNodeVisitor {
-    @Override
-    public void visit(LogicalNode node) {
-      if (node.getType() == NodeType.GROUP_BY) {
-        PlannerUtil.transformGroupbyTo2P((GroupbyNode) node);
-      }
-    }    
-  }
-  
-  @Test
   public final void testFindTopNode() throws CloneNotSupportedException, PlanningException {
     // two relations
     Expr expr = analyzer.parse(TestLogicalPlanner.QUERIES[1]);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 0f3c165..ab27a45 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -25,7 +25,9 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.*;
+import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
@@ -35,14 +37,12 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.planner.logical.*;
-import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.storage.*;
@@ -563,12 +563,8 @@ public class TestPhysicalPlanner {
 
     // Set all aggregation functions to the first phase mode
     GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
-    for (Target target : groupbyNode.getTargets()) {
-      for (EvalNode eval : EvalTreeUtil.findDistinctAggFunction(target.getEvalTree())) {
-        if (eval instanceof AggregationFunctionCallEval) {
-          ((AggregationFunctionCallEval) eval).setFirstPhase();
-        }
-      }
+    for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+      function.setFirstPhase();
     }
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
@@ -597,12 +593,8 @@ public class TestPhysicalPlanner {
 
     // Set all aggregation functions to the first phase mode
     GroupbyNode groupbyNode = PlannerUtil.findTopNode(rootNode, NodeType.GROUP_BY);
-    for (Target target : groupbyNode.getTargets()) {
-      for (EvalNode eval : EvalTreeUtil.findDistinctAggFunction(target.getEvalTree())) {
-        if (eval instanceof AggregationFunctionCallEval) {
-          ((AggregationFunctionCallEval) eval).setFirstPhase();
-        }
-      }
+    for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+      function.setFirstPhase();
     }
 
     PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);


[2/3] TAJO-539: Change some EvalNode::eval to directly return a Datum value.

Posted by hy...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 341a58a..3e756d5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -31,9 +31,12 @@ import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 import org.apache.tajo.storage.AbstractStorageManager;
 
 import java.io.IOException;
@@ -212,15 +215,12 @@ public class GlobalPlanner {
     // setup current block
     ExecutionBlock currentBlock = context.plan.newExecutionBlock();
     LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
-    for (Target target : groupbyNode.getTargets()) {
-      List<AggregationFunctionCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
-      for (AggregationFunctionCallEval function : functions) {
-        if (function.isDistinct()) {
-          columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
-        } else {
-          // See the comment of this method. the aggregation function should be executed as the first phase.
-          function.setFirstPhase();
-        }
+    for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+      if (function.isDistinct()) {
+        columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
+      } else {
+        // See the comment of this method. the aggregation function should be executed as the first phase.
+        function.setFirstPhase();
       }
     }
 
@@ -247,65 +247,140 @@ public class GlobalPlanner {
     return currentBlock;
   }
 
-  private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock childBlock,
+  private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock,
                                       GroupbyNode groupbyNode) throws PlanningException {
 
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;
 
-    if (groupbyNode.isDistinct()) {
-      return buildDistinctGroupBy(context, childBlock, groupbyNode);
+    if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function
+      return buildDistinctGroupBy(context, lastBlock, groupbyNode);
     } else {
-      GroupbyNode firstPhaseGroupBy = PlannerUtil.transformGroupbyTo2P(groupbyNode);
+      GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);
 
-      if (firstPhaseGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
-          ((TableSubQueryNode)firstPhaseGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+      if (hasUnionChild(firstPhaseGroupby)) {
+        currentBlock = buildGroupbyAndUnionPlan(masterPlan, lastBlock, firstPhaseGroupby, groupbyNode);
+      } else {
+        // general hash-shuffled aggregation
+        currentBlock = buildTwoPhaseGroupby(masterPlan, lastBlock, firstPhaseGroupby, groupbyNode);
+      }
+    }
 
-        currentBlock = childBlock;
-        for (DataChannel dataChannel : masterPlan.getIncomingChannels(currentBlock.getId())) {
-          if (firstPhaseGroupBy.isEmptyGrouping()) {
-            dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
-          } else {
-            dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
-          }
-          dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+    return currentBlock;
+  }
 
-          ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
-          GroupbyNode g1 = PlannerUtil.clone(context.plan.getLogicalPlan(), firstPhaseGroupBy);
-          g1.setChild(subBlock.getPlan());
-          subBlock.setPlan(g1);
-
-          GroupbyNode g2 = PlannerUtil.clone(context.plan.getLogicalPlan(), groupbyNode);
-          ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
-          g2.setChild(scanNode);
-          currentBlock.setPlan(g2);
-        }
-      } else { // general hash-shuffled aggregation
-        childBlock.setPlan(firstPhaseGroupBy);
-        currentBlock = masterPlan.newExecutionBlock();
+  public boolean hasUnionChild(UnaryNode node) {
 
-        DataChannel channel;
-        if (firstPhaseGroupBy.isEmptyGrouping()) {
-          channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 1);
-          channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
-        } else {
-          channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
-          channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
-        }
-        channel.setSchema(firstPhaseGroupBy.getOutSchema());
-        channel.setStoreType(storeType);
+    if (node.getChild().getType() == NodeType.TABLE_SUBQUERY) {
+      TableSubQueryNode tableSubQuery = node.getChild();
+      return tableSubQuery.getSubQuery().getType() == NodeType.UNION;
+    }
+
+    return false;
+  }
 
-        ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-        groupbyNode.setChild(scanNode);
-        groupbyNode.setInSchema(scanNode.getOutSchema());
-        currentBlock.setPlan(groupbyNode);
-        masterPlan.addConnect(channel);
+  private static ExecutionBlock buildGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
+                                                  GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) {
+    DataChannel lastDataChannel = null;
+
+    // It pushes down the first phase group-by operator into all child blocks.
+    //
+    // (second phase)    G (currentBlock)
+    //                  /|\
+    //                / / | \
+    // (first phase) G G  G  G (child block)
+
+    // They are already connected one another.
+    // So, we don't need to connect them again.
+    for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
+      if (firstPhaseGroupBy.isEmptyGrouping()) {
+        dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
+      } else {
+        dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
       }
+      dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+      ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
+
+      // Why must firstPhaseGroupby be copied?
+      //
+      // A groupby in each execution block can have different child.
+      // It affects groupby's input schema.
+      GroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
+      firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
+      childBlock.setPlan(firstPhaseGroupbyCopy);
+
+      // just keep the last data channel.
+      lastDataChannel = dataChannel;
+    }
+
+    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
+    secondPhaseGroupBy.setChild(scanNode);
+    lastBlock.setPlan(secondPhaseGroupBy);
+    return lastBlock;
+  }
+
+  private ExecutionBlock buildTwoPhaseGroupby(MasterPlan masterPlan, ExecutionBlock latestBlock,
+                                                     GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) {
+    ExecutionBlock childBlock = latestBlock;
+    childBlock.setPlan(firstPhaseGroupby);
+    ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
+
+    DataChannel channel;
+    if (firstPhaseGroupby.isEmptyGrouping()) {
+      channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 1);
+      channel.setShuffleKeys(firstPhaseGroupby.getGroupingColumns());
+    } else {
+      channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
+      channel.setShuffleKeys(firstPhaseGroupby.getGroupingColumns());
     }
+    channel.setSchema(firstPhaseGroupby.getOutSchema());
+    channel.setStoreType(storeType);
+
+    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+    secondPhaseGroupby.setChild(scanNode);
+    secondPhaseGroupby.setInSchema(scanNode.getOutSchema());
+    currentBlock.setPlan(secondPhaseGroupby);
+
+    masterPlan.addConnect(channel);
 
     return currentBlock;
   }
 
+  public static GroupbyNode createFirstPhaseGroupBy(LogicalPlan plan, GroupbyNode groupBy) {
+    Preconditions.checkNotNull(groupBy);
+
+    GroupbyNode firstPhaseGroupBy = PlannerUtil.clone(plan, groupBy);
+    GroupbyNode secondPhaseGroupBy = groupBy;
+
+    // Set first phase expressions
+    if (secondPhaseGroupBy.hasAggFunctions()) {
+      int evalNum = secondPhaseGroupBy.getAggFunctions().length;
+      AggregationFunctionCallEval [] secondPhaseEvals = secondPhaseGroupBy.getAggFunctions();
+      AggregationFunctionCallEval [] firstPhaseEvals = new AggregationFunctionCallEval[evalNum];
+
+      String [] firstPhaseEvalNames = new String[evalNum];
+      for (int i = 0; i < evalNum; i++) {
+        try {
+          firstPhaseEvals[i] = (AggregationFunctionCallEval) secondPhaseEvals[i].clone();
+        } catch (CloneNotSupportedException e) {
+          throw new RuntimeException(e);
+        }
+
+        firstPhaseEvals[i].setFirstPhase();
+        firstPhaseEvalNames[i] = plan.newGeneratedFieldName(firstPhaseEvals[i]);
+        FieldEval param = new FieldEval(firstPhaseEvalNames[i], firstPhaseEvals[i].getValueType());
+        secondPhaseEvals[i].setArgs(new EvalNode[] {param});
+      }
+
+      secondPhaseGroupBy.setAggFunctions(secondPhaseEvals);
+      firstPhaseGroupBy.setAggFunctions(firstPhaseEvals);
+      Target [] firstPhaseTargets = ProjectionPushDownRule.buildGroupByTarget(firstPhaseGroupBy, firstPhaseEvalNames);
+      firstPhaseGroupBy.setTargets(firstPhaseTargets);
+      secondPhaseGroupBy.setInSchema(PlannerUtil.targetToSchema(firstPhaseTargets));
+    }
+    return firstPhaseGroupBy;
+  }
+
   private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index 41f1e88..8acd32e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -20,14 +20,22 @@ package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.util.TUtil;
 
 public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
-	@Expose private Column [] columns;
-	@Expose private Target [] targets;
+	/** Grouping key sets */
+  @Expose private Column [] groupingColumns;
+  /** Aggregation Functions */
+  @Expose private AggregationFunctionCallEval [] aggrFunctions;
+  /**
+   * It's a list of targets. The grouping columns should be followed by aggregation functions.
+   * aggrFunctions keep actual aggregation functions, but it only contains field references.
+   * */
+  @Expose private Target [] targets;
   @Expose private boolean hasDistinct = false;
 
   public GroupbyNode(int pid) {
@@ -35,15 +43,15 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   }
 
   public final boolean isEmptyGrouping() {
-    return columns == null || columns.length == 0;
+    return groupingColumns == null || groupingColumns.length == 0;
   }
 
   public void setGroupingColumns(Column [] groupingColumns) {
-    this.columns = groupingColumns;
+    this.groupingColumns = groupingColumns;
   }
 
 	public final Column [] getGroupingColumns() {
-	  return this.columns;
+	  return this.groupingColumns;
 	}
 
   public final boolean isDistinct() {
@@ -54,6 +62,18 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
     hasDistinct = distinct;
   }
 
+  public boolean hasAggFunctions() {
+    return this.aggrFunctions != null;
+  }
+
+  public AggregationFunctionCallEval [] getAggFunctions() {
+    return this.aggrFunctions;
+  }
+
+  public void setAggFunctions(AggregationFunctionCallEval[] evals) {
+    this.aggrFunctions = evals;
+  }
+
   @Override
   public boolean hasTargets() {
     return this.targets != null;
@@ -76,9 +96,9 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   
   public String toString() {
     StringBuilder sb = new StringBuilder("\"GroupBy\": {\"grouping fields\":[");
-    for (int i=0; i < columns.length; i++) {
-      sb.append("\"").append(columns[i]).append("\"");
-      if(i < columns.length - 1)
+    for (int i=0; i < groupingColumns.length; i++) {
+      sb.append("\"").append(groupingColumns[i]).append("\"");
+      if(i < groupingColumns.length - 1)
         sb.append(",");
     }
 
@@ -92,6 +112,9 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
       }
       sb.append("],");
     }
+    if (aggrFunctions != null) {
+      sb.append("\n  \"expr\": ").append(TUtil.arrayToString(aggrFunctions)).append(",");
+    }
     sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",");
     sb.append("\n  \"in schema\": ").append(getInSchema());
     sb.append("}");
@@ -104,7 +127,8 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
     if (obj instanceof GroupbyNode) {
       GroupbyNode other = (GroupbyNode) obj;
       boolean eq = super.equals(other);
-      eq = eq && TUtil.checkEquals(columns, other.columns);
+      eq = eq && TUtil.checkEquals(groupingColumns, other.groupingColumns);
+      eq = eq && TUtil.checkEquals(aggrFunctions, other.aggrFunctions);
       eq = eq && TUtil.checkEquals(targets, other.targets);
       return eq;
     } else {
@@ -115,10 +139,17 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   @Override
   public Object clone() throws CloneNotSupportedException {
     GroupbyNode grp = (GroupbyNode) super.clone();
-    if (columns != null) {
-      grp.columns = new Column[columns.length];
-      for (int i = 0; i < columns.length; i++) {
-        grp.columns[i] = (Column) columns[i].clone();
+    if (groupingColumns != null) {
+      grp.groupingColumns = new Column[groupingColumns.length];
+      for (int i = 0; i < groupingColumns.length; i++) {
+        grp.groupingColumns[i] = (Column) groupingColumns[i].clone();
+      }
+    }
+
+    if (aggrFunctions != null) {
+      grp.aggrFunctions = new AggregationFunctionCallEval[aggrFunctions.length];
+      for (int i = 0; i < aggrFunctions.length; i++) {
+        grp.aggrFunctions[i] = (AggregationFunctionCallEval) aggrFunctions[i].clone();
       }
     }
 
@@ -138,7 +169,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
 
     StringBuilder sb = new StringBuilder();
     sb.append("(");
-    Column [] groupingColumns = columns;
+    Column [] groupingColumns = this.groupingColumns;
     for (int j = 0; j < groupingColumns.length; j++) {
       sb.append(groupingColumns[j].getColumnName());
       if(j < groupingColumns.length - 1) {
@@ -150,6 +181,17 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
 
     planStr.appendTitle(sb.toString());
 
+    sb = new StringBuilder();
+    sb.append("(");
+    for (int j = 0; j < aggrFunctions.length; j++) {
+      sb.append(aggrFunctions[j]);
+      if(j < aggrFunctions.length - 1) {
+        sb.append(",");
+      }
+    }
+    sb.append(")");
+    planStr.appendExplain("exprs: ").appendExplain(sb.toString());
+
     sb = new StringBuilder("target list: ");
     for (int i = 0; i < targets.length; i++) {
       sb.append(targets[i]);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index ed82cef..eb69703 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -18,31 +18,22 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import com.google.common.collect.Sets;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.ConstEval;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Set;
 
 public abstract class AggregationExec extends UnaryPhysicalExec {
   protected GroupbyNode plan;
 
-  protected Set<Column> nonNullGroupingFields;
-  protected int keylist [];
-  protected int measureList[];
-  protected final EvalNode evals [];
-  protected EvalContext evalContexts [];
+  protected final int groupingKeyNum;
+  protected int groupingKeyIds[];
+  protected final int aggFunctionsNum;
+  protected final AggregationFunctionCallEval aggFunctions[];
+
   protected Schema evalSchema;
 
   public AggregationExec(final TaskAttemptContext context, GroupbyNode plan,
@@ -52,47 +43,26 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
 
     evalSchema = plan.getOutSchema();
 
-    nonNullGroupingFields = Sets.newHashSet();
-    // keylist will contain a list of IDs of grouping column
-    keylist = new int[plan.getGroupingColumns().length];
+    final Column [] keyColumns = plan.getGroupingColumns();
+    groupingKeyNum = keyColumns.length;
+    groupingKeyIds = new int[groupingKeyNum];
     Column col;
     for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
-      col = plan.getGroupingColumns()[idx];
-      keylist[idx] = inSchema.getColumnId(col.getQualifiedName());
-      nonNullGroupingFields.add(col);
-    }
-
-    // measureList will contain a list of measure field indexes against the target list.
-    List<Integer> measureIndexes = TUtil.newList();
-    for (int i = 0; i < plan.getTargets().length; i++) {
-      Target target = plan.getTargets()[i];
-      if (target.getEvalTree().getType() == EvalType.AGG_FUNCTION) {
-        measureIndexes.add(i);
-      }
-    }
-
-    measureList = new int[measureIndexes.size()];
-    for (int i = 0; i < measureIndexes.size(); i++) {
-      measureList[i] = measureIndexes.get(i);
+      col = keyColumns[idx];
+      groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
     }
 
-    evals = new EvalNode[plan.getTargets().length];
-    evalContexts = new EvalContext[plan.getTargets().length];
-    for (int i = 0; i < plan.getTargets().length; i++) {
-      Target t = plan.getTargets()[i];
-      if (t.getEvalTree().getType() == EvalType.FIELD && !nonNullGroupingFields.contains(t.getNamedColumn())) {
-        evals[i] = new ConstEval(DatumFactory.createNullDatum());
-        evalContexts[i] = evals[i].newContext();
-      } else {
-        evals[i] = t.getEvalTree();
-        evalContexts[i] = evals[i].newContext();
-      }
+    if (plan.hasAggFunctions()) {
+      aggFunctions = plan.getAggFunctions();
+      aggFunctionsNum = aggFunctions.length;
+    } else {
+      aggFunctions = new AggregationFunctionCallEval[0];
+      aggFunctionsNum = 0;
     }
   }
 
   @Override
   public void close() throws IOException {
     super.close();
-    nonNullGroupingFields.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index cf56200..b39a9f1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -18,15 +18,14 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,7 +37,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
   private final JoinNode plan;
   private final boolean hasJoinQual;
   private final EvalNode joinQual;
-  private final EvalContext qualCtx;
 
   private final List<Tuple> leftTupleSlots;
   private final List<Tuple> rightTupleSlots;
@@ -58,7 +56,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
 
   // projection
   private final Projector projector;
-  private final EvalContext [] evalContexts;
 
   public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
                      final PhysicalExec leftExec, PhysicalExec rightExec) {
@@ -67,10 +64,8 @@ public class BNLJoinExec extends BinaryPhysicalExec {
     this.joinQual = plan.getJoinQual();
     if (joinQual != null) { // if join type is not 'cross join'
       hasJoinQual = true;
-      this.qualCtx = this.joinQual.newContext();
     } else {
       hasJoinQual = false;
-      this.qualCtx = null;
     }
     this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
     this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
@@ -85,7 +80,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
     }
 
     projector = new Projector(inSchema, outSchema, plan.getTargets());
-    evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -191,15 +185,12 @@ public class BNLJoinExec extends BinaryPhysicalExec {
 
       frameTuple.set(leftTuple, rightIterator.next());
       if (hasJoinQual) {
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) {
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outputTuple);
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+          projector.eval(frameTuple, outputTuple);
           return outputTuple;
         }
       } else {
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outputTuple);
+        projector.eval(frameTuple, outputTuple);
         return outputTuple;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 30c1fa7..2ff6fc9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -19,16 +19,15 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
@@ -37,11 +36,9 @@ public class BSTIndexScanExec extends PhysicalExec {
   private SeekableScanner fileScanner;
   
   private EvalNode qual;
-  private EvalContext qualCtx;
   private BSTIndex.BSTIndexReader reader;
   
   private final Projector projector;
-  private EvalContext [] evalContexts;
   
   private Datum[] datum = null;
   
@@ -54,18 +51,12 @@ public class BSTIndexScanExec extends PhysicalExec {
     super(context, scanNode.getInSchema(), scanNode.getOutSchema());
     this.scanNode = scanNode;
     this.qual = scanNode.getQual();
-    if(this.qual == null) {
-      this.qualCtx = null;
-    } else {
-      this.qualCtx = this.qual.newContext();
-    }
     this.datum = datum;
 
     this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
         scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
     this.fileScanner.init();
     this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
-    this.evalContexts = projector.newContexts();
 
     this.reader = new BSTIndex(sm.getFileSystem().getConf()).
         getIndexReader(fileName, keySchema, comparator);
@@ -109,18 +100,15 @@ public class BSTIndexScanExec extends PhysicalExec {
     Tuple outTuple = new VTuple(this.outSchema.getColumnNum());
     if (!scanNode.hasQual()) {
       if ((tuple = fileScanner.next()) != null) {
-        projector.eval(evalContexts, tuple);
-        projector.terminate(evalContexts, outTuple);
+        projector.eval(tuple, outTuple);
         return outTuple;
       } else {
         return null;
       }
     } else {
-       while( reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
-         qual.eval(qualCtx, inSchema, tuple);
-         if (qual.terminate(qualCtx).asBool()) {
-           projector.eval(evalContexts, tuple);
-           projector.terminate(evalContexts, outTuple);
+       while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
+         if (qual.eval(inSchema, tuple).isTrue()) {
+           projector.eval(tuple, outTuple);
            return outTuple;
          } else {
            fileScanner.seek(reader.next());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index 4480747..83580f9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -18,43 +18,32 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.EvalExprNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
 public class EvalExprExec extends PhysicalExec {
   private final EvalExprNode plan;
-  private final EvalContext[] evalContexts;
 
   public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
     super(context, plan.getInSchema(), plan.getOutSchema());
     this.plan = plan;
-
-    evalContexts = new EvalContext[plan.getTargets().length];
-    for (int i = 0; i < plan.getTargets().length; i++) {
-      evalContexts[i] = plan.getTargets()[i].getEvalTree().newContext();
-    }
   }
 
   @Override
   public void init() throws IOException {
   }
 
-  /* (non-Javadoc)
-  * @see PhysicalExec#next()
-  */
   @Override
   public Tuple next() throws IOException {    
     Target [] targets = plan.getTargets();
     Tuple t = new VTuple(targets.length);
     for (int i = 0; i < targets.length; i++) {
-      targets[i].getEvalTree().eval(evalContexts[i], inSchema, null);
-      t.put(i, targets[i].getEvalTree().terminate(evalContexts[i]));
+      t.put(i, targets[i].getEvalTree().eval(inSchema, null));
     }
     return t;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index ed2f275..d533b82 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
@@ -35,44 +35,38 @@ import java.util.Map.Entry;
  */
 public class HashAggregateExec extends AggregationExec {
   private Tuple tuple = null;
-  private Map<Tuple, EvalContext[]> tupleSlots;
+  private Map<Tuple, FunctionContext[]> hashTable;
   private boolean computed = false;
-  private Iterator<Entry<Tuple, EvalContext []>> iterator = null;
+  private Iterator<Entry<Tuple, FunctionContext []>> iterator = null;
 
-  /**
-   * @throws java.io.IOException
-	 * 
-	 */
-  public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode annotation,
-                           PhysicalExec subOp) throws IOException {
-    super(ctx, annotation, subOp);
-    tupleSlots = new HashMap<Tuple, EvalContext[]>(10000);
-    this.tuple = new VTuple(evalSchema.getColumnNum());
+  public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode plan, PhysicalExec subOp) throws IOException {
+    super(ctx, plan, subOp);
+    hashTable = new HashMap<Tuple, FunctionContext []>(100000);
+    this.tuple = new VTuple(plan.getOutSchema().getColumnNum());
   }
   
   private void compute() throws IOException {
     Tuple tuple;
     Tuple keyTuple;
-    int targetLength = plan.getTargets().length;
     while((tuple = child.next()) != null && !context.isStopped()) {
-      keyTuple = new VTuple(keylist.length);
+      keyTuple = new VTuple(groupingKeyIds.length);
       // build one key tuple
-      for(int i = 0; i < keylist.length; i++) {
-        keyTuple.put(i, tuple.get(keylist[i]));
+      for(int i = 0; i < groupingKeyIds.length; i++) {
+        keyTuple.put(i, tuple.get(groupingKeyIds[i]));
       }
       
-      if(tupleSlots.containsKey(keyTuple)) {
-        EvalContext [] tmpTuple = tupleSlots.get(keyTuple);
-        for(int i = 0; i < measureList.length; i++) {
-          evals[measureList[i]].eval(tmpTuple[measureList[i]], inSchema, tuple);
+      if(hashTable.containsKey(keyTuple)) {
+        FunctionContext [] contexts = hashTable.get(keyTuple);
+        for(int i = 0; i < aggFunctions.length; i++) {
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
         }
       } else { // if the key occurs firstly
-        EvalContext evalCtx [] = new EvalContext[targetLength];
-        for(int i = 0; i < targetLength; i++) {
-          evalCtx[i] = evals[i].newContext();
-          evals[i].eval(evalCtx[i], inSchema, tuple);
+        FunctionContext contexts [] = new FunctionContext[aggFunctionsNum];
+        for(int i = 0; i < aggFunctionsNum; i++) {
+          contexts[i] = aggFunctions[i].newContext();
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
         }
-        tupleSlots.put(keyTuple, evalCtx);
+        hashTable.put(keyTuple, contexts);
       }
     }
   }
@@ -81,16 +75,23 @@ public class HashAggregateExec extends AggregationExec {
   public Tuple next() throws IOException {
     if(!computed) {
       compute();
-      iterator = tupleSlots.entrySet().iterator();
+      iterator = hashTable.entrySet().iterator();
       computed = true;
     }
 
-    EvalContext [] ctx;
+    FunctionContext [] contexts;
 
     if (iterator.hasNext()) {
-      ctx =  iterator.next().getValue();
-      for (int i = 0; i < ctx.length; i++) {
-        tuple.put(i, evals[i].terminate(ctx[i]));
+      Entry<Tuple, FunctionContext []> entry = iterator.next();
+      Tuple keyTuple = entry.getKey();
+      contexts =  entry.getValue();
+
+      int tupleIdx = 0;
+      for (; tupleIdx < groupingKeyNum; tupleIdx++) {
+        tuple.put(tupleIdx, keyTuple.get(tupleIdx));
+      }
+      for (int funcIdx = 0; funcIdx < aggFunctionsNum; funcIdx++, tupleIdx++) {
+        tuple.put(tupleIdx, aggFunctions[funcIdx].terminate(contexts[funcIdx]));
       }
 
       return tuple;
@@ -101,12 +102,12 @@ public class HashAggregateExec extends AggregationExec {
 
   @Override
   public void rescan() throws IOException {    
-    iterator = tupleSlots.entrySet().iterator();
+    iterator = hashTable.entrySet().iterator();
   }
 
   @Override
   public void close() throws IOException {
     super.close();
-    tupleSlots.clear();
+    hashTable.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index caea5d9..848e362 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -18,9 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -30,6 +28,7 @@ import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.*;
@@ -48,7 +47,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
   protected Tuple outTuple = null;
   protected Map<Tuple, List<Tuple>> tupleSlots;
   protected Iterator<Tuple> iterator = null;
-  protected EvalContext qualCtx;
   protected Tuple leftTuple;
   protected Tuple leftKeyTuple;
 
@@ -60,7 +58,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
 
   // projection
   protected final Projector projector;
-  protected final EvalContext [] evalContexts;
 
   private int rightNumCols;
   private int leftNumCols;
@@ -72,7 +69,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
         plan.getOutSchema(), outer, inner);
     this.plan = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = joinQual.newContext();
     this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
 
     // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
@@ -95,7 +91,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -156,8 +151,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
           } else {
             Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
 
             return outTuple;
           }
@@ -173,8 +167,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
           //output a tuple with the nulls padded rightTuple
           Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
           frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          projector.eval(frameTuple, outTuple);
           // we simulate we found a match, which is exactly the null padded one
           shouldGetLeftTuple = true;
           return outTuple;
@@ -184,10 +177,9 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
       // getting a next right tuple on in-memory hash table.
       rightTuple = iterator.next();
       frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
-      if (joinQual.terminate(qualCtx).isTrue()) { // if both tuples are joinable
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+        projector.eval(frameTuple, outTuple);
         found = true;
         getKeyLeftTuple(leftTuple, leftKeyTuple);
         matched.put(leftKeyTuple, true);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index e08e07d..08b7035 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -18,9 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -29,6 +27,7 @@ import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.*;
@@ -46,7 +45,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
   protected Tuple outTuple = null;
   protected Map<Tuple, List<Tuple>> tupleSlots;
   protected Iterator<Tuple> iterator = null;
-  protected EvalContext qualCtx;
   protected Tuple leftTuple;
   protected Tuple leftKeyTuple;
 
@@ -58,7 +56,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
 
   // projection
   protected final Projector projector;
-  protected final EvalContext [] evalContexts;
 
   public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
       PhysicalExec rightExec) {
@@ -66,7 +63,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
         leftExec, rightExec);
     this.plan = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = joinQual.newContext();
     this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
 
     this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
@@ -85,7 +81,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -131,10 +126,8 @@ public class HashJoinExec extends BinaryPhysicalExec {
       // getting a next right tuple on in-memory hash table.
       rightTuple = iterator.next();
       frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+        projector.eval(frameTuple, outTuple);
         found = true;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 6b2d7b8..d7437c2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -80,8 +80,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
       } else {
         // if not found, it returns a tuple.
         frameTuple.set(leftTuple, rightNullTuple);
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+        projector.eval(frameTuple, outTuple);
         return outTuple;
       }
 
@@ -91,16 +90,14 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
       while (notFound && iterator.hasNext()) {
         rightTuple = iterator.next();
         frameTuple.set(leftTuple, rightTuple);
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
           notFound = false;
         }
       }
 
       if (notFound) { // if there is no matched tuple
         frameTuple.set(leftTuple, rightNullTuple);
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+        projector.eval(frameTuple, outTuple);
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 841ff5a..d0c9897 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -18,9 +18,9 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -30,13 +30,11 @@ import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.*;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 
 public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
   // from logical plan
@@ -51,7 +49,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
   protected Tuple outTuple = null;
   protected Map<Tuple, List<Tuple>> tupleSlots;
   protected Iterator<Tuple> iterator = null;
-  protected EvalContext qualCtx;
   protected Tuple leftTuple;
   protected Tuple leftKeyTuple;
 
@@ -63,7 +60,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
 
   // projection
   protected final Projector projector;
-  protected final EvalContext [] evalContexts;
 
   private int rightNumCols;
   private int leftNumCols;
@@ -75,7 +71,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
         plan.getOutSchema(), leftChild, rightChild);
     this.plan = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = joinQual.newContext();
     this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
 
     this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), rightChild.getSchema());
@@ -93,7 +88,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -137,8 +131,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
           // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
           Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
           frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          projector.eval(frameTuple, outTuple);
           // we simulate we found a match, which is exactly the null padded one
           shouldGetLeftTuple = true;
           return outTuple;
@@ -148,10 +141,8 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
       // getting a next right tuple on in-memory hash table.
       rightTuple = iterator.next();
       frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).isTrue()) { // if both tuples are joinable
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+        projector.eval(frameTuple, outTuple);
         found = true;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index c5e2d24..842c2e2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -89,11 +89,9 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
       while (notFound && iterator.hasNext()) {
         rightTuple = iterator.next();
         frameTuple.set(leftTuple, rightTuple);
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
           notFound = false;
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          projector.eval(frameTuple, outTuple);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
index 41ff7bf..0418f65 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -18,21 +18,15 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.HavingNode;
-import org.apache.tajo.engine.planner.logical.SelectionNode;
-import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
 public class HavingExec extends UnaryPhysicalExec  {
   private final EvalNode qual;
-  private final EvalContext qualCtx;
-  private final Tuple outputTuple;
 
   public HavingExec(TaskAttemptContext context,
                     HavingNode plan,
@@ -40,17 +34,14 @@ public class HavingExec extends UnaryPhysicalExec  {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
 
     this.qual = plan.getQual();
-    this.qualCtx = this.qual.newContext();
-    this.outputTuple = new VTuple(outSchema.getColumnNum());
   }
 
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
     while ((tuple = child.next()) != null) {
-      qual.eval(qualCtx, inSchema, tuple);
-      if (qual.terminate(qualCtx).asBool()) {
-          return tuple;
+      if (qual.eval(inSchema, tuple).isTrue()) {
+        return tuple;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index 446755d..1d6da3f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -19,9 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -31,6 +29,7 @@ import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,7 +40,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
   // from logical plan
   private JoinNode joinNode;
   private EvalNode joinQual;
-  private EvalContext qualCtx;
 
   // temporal tuples and states for nested loop join
   private FrameTuple frameTuple;
@@ -62,7 +60,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
 
   // projection
   private final Projector projector;
-  private final EvalContext [] evalContexts;
 
   private int rightNumCols;
   private int leftNumCols;
@@ -78,7 +75,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
         "but there is no join condition");
     this.joinNode = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = this.joinQual.newContext();
 
     this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
     this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -93,7 +89,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -148,8 +143,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
             // output a tuple with the nulls padded leftTuple
             Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
             // we simulate we found a match, which is exactly the null padded one
             rightTuple = rightChild.next();
             return outTuple;
@@ -159,8 +153,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
             // output a tuple with the nulls padded leftTuple
             Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
             frameTuple.set(leftTuple, nullPaddedTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
             // we simulate we found a match, which is exactly the null padded one
             leftTuple = leftChild.next();
             return outTuple;
@@ -206,8 +199,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
             //output a tuple with the nulls padded leftTuple
             Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
             // BEFORE RETURN, MOVE FORWARD
             rightTuple = rightChild.next();
             if(rightTuple == null) {
@@ -221,8 +213,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
             // output a tuple with the nulls padded rightTuple
             Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
             frameTuple.set(leftTuple, nullPaddedTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
             // we simulate we found a match, which is exactly the null padded one
             // BEFORE RETURN, MOVE FORWARD
             leftTuple = leftChild.next();
@@ -297,9 +288,8 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
           Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
           posRightTupleSlots = posRightTupleSlots + 1;
           frameTuple.set(leftNext, aTuple);
-          joinQual.eval(qualCtx, inSchema, frameTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          joinQual.eval(inSchema, frameTuple);
+          projector.eval(frameTuple, outTuple);
           return outTuple;
         } else {
           // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
@@ -312,9 +302,8 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
             posLeftTupleSlots = posLeftTupleSlots + 1;
 
             frameTuple.set(leftNext, aTuple);
-            joinQual.eval(qualCtx, inSchema, frameTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            joinQual.eval(inSchema, frameTuple);
+            projector.eval(frameTuple, outTuple);
             return outTuple;
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index e128fea..e1d377e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -19,9 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -30,6 +28,7 @@ import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,7 +39,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
   // from logical plan
   private JoinNode joinNode;
   private EvalNode joinQual;
-  private EvalContext qualCtx;
 
   // temporal tuples and states for nested loop join
   private FrameTuple frameTuple;
@@ -63,7 +61,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
 
   // projection
   private final Projector projector;
-  private final EvalContext [] evalContexts;
 
   public MergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
       PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
@@ -72,7 +69,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
         "but there is no join condition");
     this.joinNode = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = this.joinQual.newContext();
 
     this.outerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
     this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -89,7 +85,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
     
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -165,10 +160,9 @@ public class MergeJoinExec extends BinaryPhysicalExec {
       }
 
       frameTuple.set(outerNext, innerIterator.next());
-      joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).asBool()) {
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+        projector.eval(frameTuple, outTuple);
         return outTuple;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 055c66e..961be93 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -18,14 +18,13 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
@@ -41,10 +40,8 @@ public class NLJoinExec extends BinaryPhysicalExec {
   private Tuple outerTuple = null;
   private Tuple innerTuple = null;
   private Tuple outTuple = null;
-  private EvalContext qualCtx;
 
   // projection
-  private final EvalContext [] evalContexts;
   private final Projector projector;
 
   public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
@@ -54,12 +51,10 @@ public class NLJoinExec extends BinaryPhysicalExec {
 
     if (plan.hasJoinQual()) {
       this.joinQual = plan.getJoinQual();
-      this.qualCtx = this.joinQual.newContext();
     }
 
     // for projection
     projector = new Projector(inSchema, outSchema, plan.getTargets());
-    evalContexts = projector.newContexts();
 
     // for join
     needNewOuter = true;
@@ -90,15 +85,12 @@ public class NLJoinExec extends BinaryPhysicalExec {
 
       frameTuple.set(outerTuple, innerTuple);
       if (joinQual != null) {
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) {
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+          projector.eval(frameTuple, outTuple);
           return outTuple;
         }
       } else {
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+        projector.eval(frameTuple, outTuple);
         return outTuple;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index 305cdd2..4abe570 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -18,8 +18,6 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
@@ -27,6 +25,7 @@ import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
@@ -41,10 +40,8 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
   private Tuple leftTuple = null;
   private Tuple rightTuple = null;
   private Tuple outTuple = null;
-  private EvalContext qualCtx;
 
   // projection
-  private final EvalContext [] evalContexts;
   private final Projector projector;
 
   private boolean foundAtLeastOneMatch;
@@ -57,12 +54,10 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
 
     if (plan.hasJoinQual()) {
       this.joinQual = plan.getJoinQual();
-      this.qualCtx = this.joinQual.newContext();
     }
 
     // for projection
     projector = new Projector(inSchema, outSchema, plan.getTargets());
-    evalContexts = projector.newContexts();
 
     // for join
     needNextRightTuple = true;
@@ -96,8 +91,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
           //output a tuple with the nulls padded rightTuple
           Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
           frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          projector.eval(frameTuple, outTuple);
           // we simulate we found a match, which is exactly the null padded one
           foundAtLeastOneMatch = true;
           needNextRightTuple = true;
@@ -111,10 +105,9 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
       }
 
       frameTuple.set(leftTuple, rightTuple);
-      joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).isTrue()) {
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+      ;
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+        projector.eval(frameTuple, outTuple);
         foundAtLeastOneMatch = true;
         return outTuple;
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index c57089e..ecc6dd0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -21,12 +21,11 @@
  */
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.planner.logical.Projectable;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.Projectable;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
@@ -35,7 +34,6 @@ public class ProjectionExec extends UnaryPhysicalExec {
 
   // for projection
   private Tuple outTuple;
-  private EvalContext[] evalContexts;
   private Projector projector;
   
   public ProjectionExec(TaskAttemptContext context, Projectable plan,
@@ -49,7 +47,6 @@ public class ProjectionExec extends UnaryPhysicalExec {
 
     this.outTuple = new VTuple(outSchema.getColumnNum());
     this.projector = new Projector(inSchema, outSchema, this.plan.getTargets());
-    this.evalContexts = projector.newContexts();
   }
 
   @Override
@@ -60,8 +57,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
       return null;
     }
 
-    projector.eval(evalContexts, tuple);
-    projector.terminate(evalContexts, outTuple);
+    projector.eval(tuple, outTuple);
     return outTuple;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 2f1d33d..365faba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -19,10 +19,8 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -31,6 +29,7 @@ import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,7 +39,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
   // from logical plan
   private JoinNode joinNode;
   private EvalNode joinQual;
-  private EvalContext qualCtx;
 
   // temporal tuples and states for nested loop join
   private FrameTuple frameTuple;
@@ -61,7 +59,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
 
   // projection
   private final Projector projector;
-  private final EvalContext [] evalContexts;
 
   private int rightNumCols;
   private int leftNumCols;
@@ -77,7 +74,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
         "but there is no join condition");
     this.joinNode = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = this.joinQual.newContext();
 
     this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
     this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -91,7 +87,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -158,8 +153,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
             // output a tuple with the nulls padded leftTuple
             Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
 
             // we simulate we found a match, which is exactly the null padded one
             rightTuple = rightChild.next();
@@ -225,8 +219,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
             // output a tuple with the nulls padded left tuple
             Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
 
             // we simulate we found a match, which is exactly the null padded one
             // BEFORE RETURN, MOVE FORWARD
@@ -305,9 +298,8 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
           posRightTupleSlots = posRightTupleSlots + 1;
 
           frameTuple.set(nextLeft, aTuple);
-          joinQual.eval(qualCtx, inSchema, frameTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          joinQual.eval(inSchema, frameTuple);
+          projector.eval(frameTuple, outTuple);
           return outTuple;
 
         } else {
@@ -321,9 +313,8 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
             posLeftTupleSlots = posLeftTupleSlots + 1;
 
             frameTuple.set(nextLeft, aTuple);
-            joinQual.eval(qualCtx, inSchema, frameTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            joinQual.eval(inSchema, frameTuple);
+            projector.eval(frameTuple, outTuple);
             return outTuple;
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 5158dc0..2e676e9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.SelectionNode;
 import org.apache.tajo.storage.Tuple;
@@ -28,23 +27,20 @@ import java.io.IOException;
 
 public class SelectionExec extends UnaryPhysicalExec  {
   private final EvalNode qual;
-  private final EvalContext qualCtx;
 
   public SelectionExec(TaskAttemptContext context,
                        SelectionNode plan,
                        PhysicalExec child) {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.qual = plan.getQual();
-    this.qualCtx = this.qual.newContext();
   }
 
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
     while ((tuple = child.next()) != null) {
-      qual.eval(qualCtx, inSchema, tuple);
-      if (qual.terminate(qualCtx).isTrue()) {
-          return tuple;
+      if (qual.eval(inSchema, tuple).isTrue()) {
+        return tuple;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 6d3d6b1..c38be92 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,21 +18,24 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.ConstEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -46,12 +49,10 @@ public class SeqScanExec extends PhysicalExec {
   private Scanner scanner = null;
 
   private EvalNode qual = null;
-  private EvalContext qualCtx;
 
   private CatalogProtos.FragmentProto [] fragments;
 
   private Projector projector;
-  private EvalContext [] evalContexts;
 
   public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
                      ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
@@ -60,12 +61,6 @@ public class SeqScanExec extends PhysicalExec {
     this.plan = plan;
     this.qual = plan.getQual();
     this.fragments = fragments;
-
-    if (qual == null) {
-      qualCtx = null;
-    } else {
-      qualCtx = this.qual.newContext();
-    }
   }
 
   /**
@@ -100,9 +95,7 @@ public class SeqScanExec extends PhysicalExec {
     // However, actual values absent in tuples. So, Replace all column references by constant datum.
     for (Column column : columnPartitionSchema.toArray()) {
       FieldEval targetExpr = new FieldEval(column);
-      EvalContext evalContext = targetExpr.newContext();
-      targetExpr.eval(evalContext, columnPartitionSchema, partitionRow);
-      Datum datum = targetExpr.terminate(evalContext);
+      Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow);
       ConstEval constExpr = new ConstEval(datum);
       for (Target target : plan.getTargets()) {
         if (target.getEvalTree().equals(targetExpr)) {
@@ -151,7 +144,6 @@ public class SeqScanExec extends PhysicalExec {
     }
 
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     if (fragments.length > 1) {
       this.scanner = new MergeScanner(context.getConf(), plan.getTableSchema(), plan.getTableDesc().getMeta(),
@@ -172,8 +164,7 @@ public class SeqScanExec extends PhysicalExec {
 
     if (!plan.hasQual()) {
       if ((tuple = scanner.next()) != null) {
-        projector.eval(evalContexts, tuple);
-        projector.terminate(evalContexts, outTuple);
+        projector.eval(tuple, outTuple);
         outTuple.setOffset(tuple.getOffset());
         return outTuple;
       } else {
@@ -181,10 +172,9 @@ public class SeqScanExec extends PhysicalExec {
       }
     } else {
       while ((tuple = scanner.next()) != null) {
-        qual.eval(qualCtx, inSchema, tuple);
-        if (qual.terminate(qualCtx).isTrue()) {
-          projector.eval(evalContexts, tuple);
-          projector.terminate(evalContexts, outTuple);
+
+        if (qual.eval(inSchema, tuple).isTrue()) {
+          projector.eval(tuple, outTuple);
           return outTuple;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
index af1bf34..dbe45dc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
@@ -18,82 +18,108 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
 /**
- * This is the sort-based Aggregation Operator.
+ * This is the sort-based aggregation operator.
+ *
+ * <h3>Implementation</h3>
+ * Sort Aggregation has two states while running.
+ *
+ * <h4>Aggregate state</h4>
+ * If lastkey is null or lastkey is equivalent to the current key, sort aggregation is changed to this state.
+ * In this state, this operator aggregates measure values via aggregation functions.
+ *
+ * <h4>Finalize state</h4>
+ * If currentKey is different from the last key, it computes final aggregation results, and then
+ * it makes an output tuple.
  */
 public class SortAggregateExec extends AggregationExec {
-  private Tuple prevKey = null;
+  private Tuple lastKey = null;
   private boolean finished = false;
+  private FunctionContext contexts[];
 
-
-  public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan,
-                           PhysicalExec child) throws IOException {
+  public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan, PhysicalExec child) throws IOException {
     super(context, plan, child);
+    contexts = new FunctionContext[plan.getAggFunctions().length];
   }
 
   @Override
   public Tuple next() throws IOException {
-    Tuple curKey;
+    Tuple currentKey;
     Tuple tuple;
-    Tuple finalTuple = null;
+    Tuple outputTuple = null;
+
     while(!context.isStopped() && (tuple = child.next()) != null) {
-      // build a key tuple
-      curKey = new VTuple(keylist.length);
-      for(int i = 0; i < keylist.length; i++) {
-        curKey.put(i, tuple.get(keylist[i]));
+
+      // get a key tuple
+      currentKey = new VTuple(groupingKeyIds.length);
+      for(int i = 0; i < groupingKeyIds.length; i++) {
+        currentKey.put(i, tuple.get(groupingKeyIds[i]));
       }
 
-      if (prevKey == null || prevKey.equals(curKey)) {
-        if (prevKey == null) {
-          for(int i = 0; i < outSchema.getColumnNum(); i++) {
-            evalContexts[i] = evals[i].newContext();
-            evals[i].eval(evalContexts[i], inSchema, tuple);
+      /** Aggregation State */
+      if (lastKey == null || lastKey.equals(currentKey)) {
+        if (lastKey == null) {
+          for(int i = 0; i < aggFunctionsNum; i++) {
+            contexts[i] = aggFunctions[i].newContext();
+            aggFunctions[i].merge(contexts[i], inSchema, tuple);
           }
-          prevKey = curKey;
+          lastKey = currentKey;
         } else {
           // aggregate
-          for (int idx : measureList) {
-            evals[idx].eval(evalContexts[idx], inSchema, tuple);
+          for (int i = 0; i < aggFunctionsNum; i++) {
+            aggFunctions[i].merge(contexts[i], inSchema, tuple);
           }
         }
-      } else {
+
+      } else { /** Finalization State */
         // finalize aggregate and return
-        finalTuple = new VTuple(outSchema.getColumnNum());
-        for(int i = 0; i < outSchema.getColumnNum(); i++) {
-          finalTuple.put(i, evals[i].terminate(evalContexts[i]));
+        outputTuple = new VTuple(outSchema.getColumnNum());
+        int tupleIdx = 0;
+
+        for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+          outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+        }
+        for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+          outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
         }
 
-        for(int i = 0; i < outSchema.getColumnNum(); i++) {
-          evalContexts[i] = evals[i].newContext();
-          evals[i].eval(evalContexts[i], inSchema, tuple);
+        for(int evalIdx = 0; evalIdx < aggFunctionsNum; evalIdx++) {
+          contexts[evalIdx] = aggFunctions[evalIdx].newContext();
+          aggFunctions[evalIdx].merge(contexts[evalIdx], inSchema, tuple);
         }
-        prevKey = curKey;
-        return finalTuple;
+
+        lastKey = currentKey;
+        return outputTuple;
       }
     } // while loop
 
     if (!finished) {
-      finalTuple = new VTuple(outSchema.getColumnNum());
-      for(int i = 0; i < outSchema.getColumnNum(); i++) {
-        finalTuple.put(i, evals[i].terminate(evalContexts[i]));
+      outputTuple = new VTuple(outSchema.getColumnNum());
+      int tupleIdx = 0;
+      for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+        outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+      }
+      for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+        outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
       }
       finished = true;
     }
-    return finalTuple;
+    return outputTuple;
   }
 
   @Override
   public void rescan() throws IOException {
     super.rescan();
 
-    prevKey = null;
+    lastKey = null;
     finished = false;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
index 72ff67c..041220a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -95,16 +95,13 @@ public class PartitionedTableRewriter implements RewriteRule {
   }
 
   private static class PartitionPathFilter implements PathFilter {
-    private FileSystem fs;
     private Schema schema;
     private EvalNode partitionFilter;
-    private EvalContext evalContext;
 
 
     public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
       this.schema = schema;
       this.partitionFilter = partitionFilter;
-      evalContext = partitionFilter.newContext();
     }
 
     @Override
@@ -113,8 +110,8 @@ public class PartitionedTableRewriter implements RewriteRule {
       if (tuple == null) { // if it is a file or not acceptable file
         return false;
       }
-      partitionFilter.eval(evalContext, schema, tuple);
-      return partitionFilter.terminate(evalContext).asBool();
+
+      return partitionFilter.eval(schema, tuple).asBool();
     }
 
     @Override


[3/3] git commit: TAJO-539: Change some EvalNode::eval to directly return a Datum value.

Posted by hy...@apache.org.
TAJO-539: Change some EvalNode::eval to directly return a Datum value.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/e23e78cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/e23e78cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/e23e78cc

Branch: refs/heads/master
Commit: e23e78ccd4c740fd93d2629e5b523c3f4b29c199
Parents: 0386268
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sat Jan 25 15:34:35 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sat Jan 25 15:34:45 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../eval/AggregationFunctionCallEval.java       |  41 ++---
 .../apache/tajo/engine/eval/AlgebraicUtil.java  |   4 +-
 .../tajo/engine/eval/BetweenPredicateEval.java  |  85 +++------
 .../org/apache/tajo/engine/eval/BinaryEval.java |  28 +--
 .../apache/tajo/engine/eval/CaseWhenEval.java   |  73 ++------
 .../org/apache/tajo/engine/eval/CastEval.java   |  25 +--
 .../org/apache/tajo/engine/eval/ConstEval.java  |  18 +-
 .../apache/tajo/engine/eval/EvalContext.java    |  22 ---
 .../org/apache/tajo/engine/eval/EvalNode.java   |   6 +-
 .../apache/tajo/engine/eval/EvalTreeUtil.java   |   5 +-
 .../org/apache/tajo/engine/eval/FieldEval.java  |  22 +--
 .../apache/tajo/engine/eval/FunctionEval.java   |  20 +--
 .../tajo/engine/eval/GeneralFunctionEval.java   |  20 +--
 .../org/apache/tajo/engine/eval/InEval.java     |  29 +--
 .../org/apache/tajo/engine/eval/IsNullEval.java |  30 +---
 .../org/apache/tajo/engine/eval/NotEval.java    |  20 +--
 .../tajo/engine/eval/PartialBinaryExpr.java     |  14 +-
 .../engine/eval/PatternMatchPredicateEval.java  |  30 +---
 .../tajo/engine/eval/RowConstantEval.java       |  20 +--
 .../org/apache/tajo/engine/eval/SignedEval.java |  21 +--
 .../tajo/engine/planner/ExprsVerifier.java      |  13 +-
 .../engine/planner/LogicalPlanVerifier.java     |  11 --
 .../tajo/engine/planner/LogicalPlanner.java     | 165 ++++++++++-------
 .../apache/tajo/engine/planner/PlannerUtil.java |  96 +---------
 .../engine/planner/PreLogicalPlanVerifier.java  |  69 +++++++-
 .../apache/tajo/engine/planner/Projector.java   |  20 +--
 .../engine/planner/global/GlobalPlanner.java    | 177 +++++++++++++------
 .../engine/planner/logical/GroupbyNode.java     |  70 ++++++--
 .../planner/physical/AggregationExec.java       |  64 ++-----
 .../engine/planner/physical/BNLJoinExec.java    |  19 +-
 .../planner/physical/BSTIndexScanExec.java      |  24 +--
 .../engine/planner/physical/EvalExprExec.java   |  15 +-
 .../planner/physical/HashAggregateExec.java     |  65 +++----
 .../planner/physical/HashFullOuterJoinExec.java |  20 +--
 .../engine/planner/physical/HashJoinExec.java   |  13 +-
 .../planner/physical/HashLeftAntiJoinExec.java  |   9 +-
 .../planner/physical/HashLeftOuterJoinExec.java |  21 +--
 .../planner/physical/HashLeftSemiJoinExec.java  |   6 +-
 .../engine/planner/physical/HavingExec.java     |  13 +-
 .../physical/MergeFullOuterJoinExec.java        |  29 +--
 .../engine/planner/physical/MergeJoinExec.java  |  14 +-
 .../engine/planner/physical/NLJoinExec.java     |  16 +-
 .../planner/physical/NLLeftOuterJoinExec.java   |  17 +-
 .../engine/planner/physical/ProjectionExec.java |  10 +-
 .../physical/RightOuterMergeJoinExec.java       |  23 +--
 .../engine/planner/physical/SelectionExec.java  |   8 +-
 .../engine/planner/physical/SeqScanExec.java    |  42 ++---
 .../planner/physical/SortAggregateExec.java     |  94 ++++++----
 .../rewrite/PartitionedTableRewriter.java       |   7 +-
 .../planner/rewrite/ProjectionPushDownRule.java |  77 +++++---
 .../org/apache/tajo/engine/utils/TupleUtil.java |   7 +-
 .../org/apache/tajo/master/GlobalEngine.java    |   2 +-
 .../apache/tajo/engine/eval/ExprTestBase.java   |   5 +-
 .../apache/tajo/engine/eval/TestEvalTree.java   | 144 ++++-----------
 .../tajo/engine/eval/TestEvalTreeUtil.java      |  34 ++--
 .../tajo/engine/planner/TestLogicalPlanner.java |   2 +-
 .../tajo/engine/planner/TestPlannerUtil.java    |  22 ---
 .../planner/physical/TestPhysicalPlanner.java   |  24 +--
 59 files changed, 774 insertions(+), 1229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f4fa4e3..c04b2d4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -125,6 +125,9 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-539: Change some EvalNode::eval to directly return a Datum value.
+    (hyunsik)
+
     TAJO-543: InsertNode and CreateTableNode should play their roles. (hyunsik)
 
     TAJO-409: Add explored and explained annotations to Tajo function system.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
index 290ff45..10eadce 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AggregationFunctionCallEval.java
@@ -21,10 +21,10 @@ package org.apache.tajo.engine.eval;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.function.AggFunction;
-import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.function.AggFunction;
+import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 
@@ -38,40 +38,38 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
     this.instance = instance;
   }
 
-  @Override
-  public EvalContext newContext() {
-    return new AggFunctionCtx(argEvals, instance.newContext());
+  public FunctionContext newContext() {
+    return instance.newContext();
   }
 
-  @Override
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-    AggFunctionCtx localCtx = (AggFunctionCtx) ctx;
+  public void merge(FunctionContext context, Schema schema, Tuple tuple) {
     if (params == null) {
       this.params = new VTuple(argEvals.length);
     }
 
     if (argEvals != null) {
-      params.clear();
-
       for (int i = 0; i < argEvals.length; i++) {
-        argEvals[i].eval(localCtx.argCtxs[i], schema, tuple);
-        params.put(i, argEvals[i].terminate(localCtx.argCtxs[i]));
+        params.put(i, argEvals[i].eval(schema, tuple));
       }
     }
 
     if (firstPhase) {
-      instance.eval(localCtx.funcCtx, params);
+      instance.eval(context, params);
     } else {
-      instance.merge(localCtx.funcCtx, params);
+      instance.merge(context, params);
     }
   }
 
   @Override
-  public Datum terminate(EvalContext ctx) {
+  public Datum eval(Schema schema, Tuple tuple) {
+    throw new UnsupportedOperationException("Cannot execute eval() of aggregation function");
+  }
+
+  public Datum terminate(FunctionContext context) {
     if (firstPhase) {
-      return instance.getPartialResult(((AggFunctionCtx)ctx).funcCtx);
+      return instance.getPartialResult(context);
     } else {
-      return instance.terminate(((AggFunctionCtx)ctx).funcCtx);
+      return instance.terminate(context);
     }
   }
 
@@ -91,13 +89,4 @@ public class AggregationFunctionCallEval extends FunctionEval implements Cloneab
   public void setFirstPhase() {
     this.firstPhase = true;
   }
-
-  protected class AggFunctionCtx extends FuncCallCtx {
-    FunctionContext funcCtx;
-
-    AggFunctionCtx(EvalNode [] argEvals, FunctionContext funcCtx) {
-      super(argEvals);
-      this.funcCtx = funcCtx;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
index 70ae712..6bb0160 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/AlgebraicUtil.java
@@ -157,9 +157,7 @@ public class AlgebraicUtil {
       right = eliminateConstantExprs(right);
 
       if (left.getType() == EvalType.CONST && right.getType() == EvalType.CONST) {
-        EvalContext exprCtx = expr.newContext();
-        expr.eval(exprCtx, null, null);
-        return new ConstEval(expr.terminate(exprCtx));
+        return new ConstEval(expr.eval(null, null));
       } else {
         return new BinaryEval(expr.getType(), left, right);
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BetweenPredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BetweenPredicateEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BetweenPredicateEval.java
index 0215928..61dc02b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BetweenPredicateEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BetweenPredicateEval.java
@@ -47,7 +47,7 @@ public class BetweenPredicateEval extends EvalNode {
   }
 
   private static interface Checker {
-    void eval(BetweenContext context, Schema schema, Tuple param);
+    Datum eval(Schema schema, Tuple param);
   }
 
   private static class ConstantChecker implements Checker {
@@ -69,16 +69,14 @@ public class BetweenPredicateEval extends EvalNode {
     }
 
     @Override
-    public void eval(BetweenContext context, Schema schema, Tuple param) {
-      predicand.eval(context.predicandContext, schema, param);
-      Datum predicandValue = predicand.terminate(context.predicandContext);
+    public Datum eval(Schema schema, Tuple param) {
+      Datum predicandValue = predicand.eval(schema, param);
 
-      if (!(predicandValue instanceof NullDatum)) {
-        context.result =
-            DatumFactory.createBool(not ^ (predicandValue.greaterThanEqual(begin).asBool()
+      if (!predicandValue.isNull()) {
+        return DatumFactory.createBool(not ^ (predicandValue.greaterThanEqual(begin).asBool()
                 && predicandValue.lessThanEqual(end).asBool()));
       } else {
-        context.result = NullDatum.get();
+        return NullDatum.get();
       }
     }
   }
@@ -97,20 +95,17 @@ public class BetweenPredicateEval extends EvalNode {
     }
 
     @Override
-    public void eval(BetweenContext context, Schema schema, Tuple param) {
-      predicand.eval(context.predicandContext, schema, param);
-      Datum predicandValue = predicand.terminate(context.predicandContext);
-      begin.eval(context.beginContext, schema, param);
-      Datum beginValue = begin.terminate(context.beginContext);
-      end.eval(context.endContext, schema, param);
-      Datum endValue = begin.terminate(context.endContext);
-
-      if (!(predicandValue instanceof NullDatum || beginValue instanceof NullDatum || endValue instanceof NullDatum)) {
-        context.result =
+    public Datum eval(Schema schema, Tuple param) {
+      Datum predicandValue = predicand.eval(schema, param);
+      Datum beginValue = begin.eval(schema, param);
+      Datum endValue = end.eval(schema, param);
+
+      if (!(predicandValue.isNull() || beginValue.isNull() || endValue.isNull())) {
+        return
             DatumFactory.createBool(not ^ (predicandValue.greaterThanEqual(beginValue).asBool()
                 && predicandValue.lessThanEqual(endValue).asBool()));
       } else {
-        context.result = NullDatum.get();
+        return NullDatum.get();
       }
     }
   }
@@ -129,31 +124,23 @@ public class BetweenPredicateEval extends EvalNode {
     }
 
     @Override
-    public void eval(BetweenContext context, Schema schema, Tuple param) {
-      predicand.eval(context.predicandContext, schema, param);
-      Datum predicandValue = predicand.terminate(context.predicandContext);
-      begin.eval(context.beginContext, schema, param);
-      Datum beginValue = begin.terminate(context.beginContext);
-      end.eval(context.endContext, schema, param);
-      Datum endValue = begin.terminate(context.endContext);
-
-      if (!(predicandValue instanceof NullDatum || beginValue instanceof NullDatum || endValue instanceof NullDatum)) {
-        context.result = DatumFactory.createBool( not ^
+    public Datum eval(Schema schema, Tuple param) {
+      Datum predicandValue = predicand.eval(schema, param);
+      Datum beginValue = begin.eval(schema, param);
+      Datum endValue = end.eval(schema, param);
+
+      if (!(predicandValue.isNull()|| beginValue.isNull() || endValue.isNull())) {
+        return DatumFactory.createBool( not ^
             (predicandValue.greaterThanEqual(beginValue).asBool() && predicandValue.lessThanEqual(endValue).asBool()) ||
             (predicandValue.lessThanEqual(beginValue).asBool() && predicandValue.greaterThanEqual(endValue).asBool())
         );
       } else {
-        context.result = NullDatum.get();
+        return NullDatum.get();
       }
     }
   }
 
   @Override
-  public EvalContext newContext() {
-    return new BetweenContext();
-  }
-
-  @Override
   public TajoDataTypes.DataType getValueType() {
     return RES_TYPE;
   }
@@ -169,14 +156,14 @@ public class BetweenPredicateEval extends EvalNode {
   }
 
   @Override
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
+  public Datum eval(Schema schema, Tuple tuple) {
     if (checker == null) {
       if (begin.getType() == EvalType.CONST && end.getType() == EvalType.CONST) {
-        Datum beginValue = begin.terminate(null);
-        Datum endValue = end.terminate(null);
+        Datum beginValue = ((ConstEval)begin).getValue();
+        Datum endValue = ((ConstEval)end).getValue();
 
         if (symmetric || beginValue.compareTo(endValue) <= 0) {
-          checker = new ConstantChecker(not, predicand, begin.terminate(null), end.terminate(null));
+          checker = new ConstantChecker(not, predicand, beginValue, endValue);
         } else {
           checker = new AsymmetricChecker(not, predicand, begin, end);
         }
@@ -189,12 +176,7 @@ public class BetweenPredicateEval extends EvalNode {
       }
     }
 
-    checker.eval((BetweenContext) ctx, schema, tuple);
-  }
-
-  @Override
-  public Datum terminate(EvalContext ctx) {
-    return ((BetweenContext)ctx).result;
+    return checker.eval(schema, tuple);
   }
 
   @Override
@@ -207,19 +189,6 @@ public class BetweenPredicateEval extends EvalNode {
     return false;
   }
 
-  private class BetweenContext implements EvalContext {
-    private EvalContext predicandContext;
-    private EvalContext beginContext;
-    private EvalContext endContext;
-    private Datum result;
-
-    BetweenContext() {
-      predicandContext = predicand.newContext();
-      beginContext = begin.newContext();
-      endContext = end.newContext();
-    }
-  }
-
   @Deprecated
   public void preOrder(EvalNodeVisitor visitor) {
     visitor.visit(this);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
index ba21b4a..d362927 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/BinaryEval.java
@@ -34,11 +34,6 @@ import static org.apache.tajo.common.TajoDataTypes.Type;
 public class BinaryEval extends EvalNode implements Cloneable {
   @Expose private DataType returnType = null;
 
-  private class BinaryEvalCtx implements EvalContext {
-    EvalContext left;
-    EvalContext right;
-  }
-
   /**
    * @param type
    */
@@ -75,15 +70,6 @@ public class BinaryEval extends EvalNode implements Cloneable {
     this(expr.type, expr.leftExpr, expr.rightExpr);
   }
 
-  @Override
-  public EvalContext newContext() {
-    BinaryEvalCtx newCtx =  new BinaryEvalCtx();
-    newCtx.left = leftExpr.newContext();
-    newCtx.right = rightExpr.newContext();
-
-    return newCtx;
-  }
-
   /**
    * This is verified by ExprsVerifier.checkArithmeticOperand().
    */
@@ -134,17 +120,9 @@ public class BinaryEval extends EvalNode implements Cloneable {
   }
 
   @Override
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-    BinaryEvalCtx binCtx = (BinaryEvalCtx) ctx;
-    leftExpr.eval(binCtx == null ? null : binCtx.left, schema, tuple);
-    rightExpr.eval(binCtx == null ? null : binCtx.right, schema, tuple);
-  }
-
-  @Override
-  public Datum terminate(EvalContext ctx) {
-    BinaryEvalCtx binCtx = (BinaryEvalCtx) ctx;
-    Datum lhs = leftExpr.terminate(binCtx.left);
-    Datum rhs = rightExpr.terminate(binCtx.right);
+  public Datum eval(Schema schema, Tuple tuple) {
+    Datum lhs = leftExpr.eval(schema, tuple);
+    Datum rhs = rightExpr.eval(schema, tuple);
 
     switch(type) {
     case AND:

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
index 9bbf4b4..d08bfd3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CaseWhenEval.java
@@ -25,7 +25,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.storage.Tuple;
@@ -62,11 +62,6 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
   }
 
   @Override
-  public EvalContext newContext() {
-    return new CaseContext(whens, elseResult != null ? elseResult.newContext() : null);
-  }
-
-  @Override
   public DataType getValueType() {
     return whens.get(0).getResultExpr().getValueType();
   }
@@ -76,30 +71,18 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
     return "?";
   }
 
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-    CaseContext caseCtx = (CaseContext) ctx;
+  public Datum eval(Schema schema, Tuple tuple) {
     for (int i = 0; i < whens.size(); i++) {
-      whens.get(i).eval(caseCtx.contexts[i], schema, tuple);
+      if (whens.get(i).checkIfCondition(schema, tuple)) {
+        return whens.get(i).eval(schema, tuple);
+      }
     }
 
     if (elseResult != null) { // without else clause
-      elseResult.eval(caseCtx.elseCtx, schema, tuple);
+      return elseResult.eval(schema, tuple);
     }
-  }
 
-  @Override
-  public Datum terminate(EvalContext ctx) {
-    CaseContext caseCtx = (CaseContext) ctx;
-    for (int i = 0; i < whens.size(); i++) {
-      if (whens.get(i).terminate(caseCtx.contexts[i]).asBool()) {
-        return whens.get(i).getThenResult(caseCtx.contexts[i]);
-      }
-    }
-    if (elseResult != null) { // without else clause
-      return elseResult.terminate(caseCtx.elseCtx);
-    } else {
-      return DatumFactory.createNullDatum();
-    }
+    return NullDatum.get();
   }
 
   @Override
@@ -163,11 +146,6 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
     }
 
     @Override
-    public EvalContext newContext() {
-      return new WhenContext(condition.newContext(), result.newContext());
-    }
-
-    @Override
     public DataType getValueType() {
       return CatalogUtil.newSimpleDataType(TajoDataTypes.Type.BOOLEAN);
     }
@@ -177,14 +155,12 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
       return "when?";
     }
 
-    public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-      condition.eval(((WhenContext) ctx).condCtx, schema, tuple);
-      result.eval(((WhenContext) ctx).resultCtx, schema, tuple);
+    public boolean checkIfCondition(Schema schema, Tuple tuple) {
+      return condition.eval(schema, tuple).isTrue();
     }
 
-    @Override
-    public Datum terminate(EvalContext ctx) {
-      return condition.terminate(((WhenContext) ctx).condCtx);
+    public Datum eval(Schema schema, Tuple tuple) {
+      return result.eval(schema, tuple);
     }
 
     public EvalNode getConditionExpr() {
@@ -215,20 +191,6 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
       return CoreGsonHelper.toJson(IfThenEval.this, IfThenEval.class);
     }
 
-    private class WhenContext implements EvalContext {
-      EvalContext condCtx;
-      EvalContext resultCtx;
-
-      public WhenContext(EvalContext condCtx, EvalContext resultCtx) {
-        this.condCtx = condCtx;
-        this.resultCtx = resultCtx;
-      }
-    }
-
-    public Datum getThenResult(EvalContext ctx) {
-      return result.terminate(((WhenContext) ctx).resultCtx);
-    }
-
     @Override
     public void preOrder(EvalNodeVisitor visitor) {
       visitor.visit(this);
@@ -243,17 +205,4 @@ public class CaseWhenEval extends EvalNode implements GsonObject {
       visitor.visit(this);
     }
   }
-
-  private class CaseContext implements EvalContext {
-    EvalContext [] contexts;
-    EvalContext elseCtx;
-
-    CaseContext(List<IfThenEval> whens, EvalContext elseCtx) {
-      contexts = new EvalContext[whens.size()];
-      for (int i = 0; i < whens.size(); i++) {
-        contexts[i] = whens.get(i).newContext();
-      }
-      this.elseCtx = elseCtx;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CastEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CastEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CastEval.java
index 033800d..9ff3df1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CastEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/CastEval.java
@@ -41,13 +41,6 @@ public class CastEval extends EvalNode {
   }
 
   @Override
-  public EvalContext newContext() {
-    CastContext castContext = new CastContext();
-    castContext.childCtx = operand.newContext();
-    return castContext;
-  }
-
-  @Override
   public DataType getValueType() {
     return target;
   }
@@ -57,15 +50,8 @@ public class CastEval extends EvalNode {
     return target.getType().name();
   }
 
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-    CastContext castContext = (CastContext) ctx;
-    operand.eval(castContext.childCtx , schema, tuple);
-  }
-
-  @Override
-  public Datum terminate(EvalContext ctx) {
-    CastContext castContext = (CastContext) ctx;
-    Datum operandDatum = operand.terminate(castContext.childCtx);
+  public Datum eval(Schema schema, Tuple tuple) {
+    Datum operandDatum = operand.eval(schema, tuple);
     if (operandDatum.isNull()) {
       return operandDatum;
     }
@@ -97,8 +83,7 @@ public class CastEval extends EvalNode {
       case BLOB:
         return DatumFactory.createBlob(operandDatum.asByteArray());
       default:
-        throw new InvalidCastException("Cannot cast " + operand.getValueType().getType() + " to "
-            + target.getType());
+      throw new InvalidCastException("Cannot cast " + operand.getValueType().getType() + " to " + target.getType());
     }
   }
 
@@ -127,8 +112,4 @@ public class CastEval extends EvalNode {
     operand.postOrder(visitor);
     visitor.visit(this);
   }
-
-  static class CastContext implements EvalContext {
-    EvalContext childCtx;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/ConstEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/ConstEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/ConstEval.java
index 6d7f67f..2cb530d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/ConstEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/ConstEval.java
@@ -21,8 +21,10 @@ package org.apache.tajo.engine.eval;
 import com.google.common.base.Objects;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
 
 public class ConstEval extends EvalNode implements Comparable<ConstEval>, Cloneable {
 	@Expose Datum datum = null;
@@ -32,17 +34,6 @@ public class ConstEval extends EvalNode implements Comparable<ConstEval>, Clonea
 		this.datum = datum;
 	}
 
-  @Override
-  public EvalContext newContext() {
-    return null;
-  }
-
-  @Override
-  public Datum terminate(EvalContext ctx) {
-    return this.datum;
-  }
-
-
   public Datum getValue() {
     return this.datum;
   }
@@ -52,6 +43,11 @@ public class ConstEval extends EvalNode implements Comparable<ConstEval>, Clonea
 	}
 
   @Override
+  public Datum eval(Schema schema, Tuple tuple) {
+    return datum;
+  }
+
+  @Override
 	public DataType getValueType() {
     return CatalogUtil.newSimpleDataType(datum.type());
 	}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalContext.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalContext.java
deleted file mode 100644
index 00007ce..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalContext.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.eval;
-
-public interface EvalContext {
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
index 2e797fb..5578043 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
@@ -40,8 +40,6 @@ public abstract class EvalNode implements Cloneable, GsonObject {
 		this.leftExpr = left;
 		this.rightExpr = right;
 	}
-
-  public abstract EvalContext newContext();
 	
 	public EvalType getType() {
 		return this.type;
@@ -86,9 +84,7 @@ public abstract class EvalNode implements Cloneable, GsonObject {
     return CoreGsonHelper.toJson(this, EvalNode.class);
 	}
 	
-	public void eval(EvalContext ctx, Schema schema, Tuple tuple) {}
-
-  public abstract <T extends Datum> T terminate(EvalContext ctx);
+	public abstract <T extends Datum> T eval(Schema schema, Tuple tuple);
 
   @Deprecated
 	public void preOrder(EvalNodeVisitor visitor) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
index 4044217..da05739 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/EvalTreeUtil.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.engine.eval;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.tajo.catalog.Column;
@@ -277,10 +276,10 @@ public class EvalTreeUtil {
     }
   }
 
-  public static List<AggregationFunctionCallEval> findDistinctAggFunction(EvalNode expr) {
+  public static Set<AggregationFunctionCallEval> findDistinctAggFunction(EvalNode expr) {
     AllAggFunctionFinder finder = new AllAggFunctionFinder();
     expr.postOrder(finder);
-    return Lists.newArrayList(finder.getAggregationFunction());
+    return finder.getAggregationFunction();
   }
 
   public static class AllAggFunctionFinder implements EvalNodeVisitor {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
index dd29a3b..dc9b35b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FieldEval.java
@@ -40,32 +40,14 @@ public class FieldEval extends EvalNode implements Cloneable {
 	}
 
 	@Override
-	public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
+	public Datum eval(Schema schema, Tuple tuple) {
 	  if (fieldId == -1) {
 	    fieldId = schema.getColumnId(column.getQualifiedName());
       if (fieldId == -1) {
         throw new IllegalStateException("No Such Column Reference: " + column + ", schema: " + schema);
       }
 	  }
-    FieldEvalContext fieldCtx = (FieldEvalContext) ctx;
-	  fieldCtx.datum = tuple.get(fieldId);
-	}
-
-  @Override
-  public Datum terminate(EvalContext ctx) {
-    return ((FieldEvalContext)ctx).datum;
-  }
-
-  @Override
-  public EvalContext newContext() {
-    return new FieldEvalContext();
-  }
-
-  private static class FieldEvalContext implements EvalContext {
-    private Datum datum;
-
-    public FieldEvalContext() {
-    }
+	  return tuple.get(fieldId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
index c4906d7..0555bde 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/FunctionEval.java
@@ -64,12 +64,6 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
   public boolean isDistinct() {
     return funcDesc.getFuncType() == DISTINCT_AGGREGATION || funcDesc.getFuncType() == DISTINCT_UDA;
   }
-
-  @Override
-  public EvalContext newContext() {
-    FuncCallCtx newCtx = new FuncCallCtx(argEvals);
-    return newCtx;
-  }
 	
 	public EvalNode [] getArgs() {
 	  return this.argEvals;
@@ -84,9 +78,7 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
 	}
 
 	@Override
-	public abstract void eval(EvalContext ctx, Schema schema, Tuple tuple);
-
-  public abstract Datum terminate(EvalContext ctx);
+	public abstract Datum eval(Schema schema, Tuple tuple);
 
 	@Override
 	public String getName() {
@@ -149,14 +141,4 @@ public abstract class FunctionEval extends EvalNode implements Cloneable {
 	  }
 	  visitor.visit(this);
 	}
-
-  protected class FuncCallCtx implements EvalContext {
-    EvalContext [] argCtxs;
-    FuncCallCtx(EvalNode [] argEvals) {
-      argCtxs = new EvalContext[argEvals.length];
-      for (int i = 0; i < argEvals.length; i++) {
-        argCtxs[i] = argEvals[i].newContext();
-      }
-    }
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/GeneralFunctionEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/GeneralFunctionEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/GeneralFunctionEval.java
index aeb68aa..9446d70 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/GeneralFunctionEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/GeneralFunctionEval.java
@@ -22,17 +22,15 @@ import com.google.common.base.Objects;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.function.GeneralFunction;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.function.GeneralFunction;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.TUtil;
 
 public class GeneralFunctionEval extends FunctionEval {
   @Expose protected GeneralFunction instance;
-  private Tuple tuple;
   private Tuple params = null;
-  private Schema schema;
 
 	public GeneralFunctionEval(FunctionDesc desc, GeneralFunction instance, EvalNode[] givenArgs) {
 		super(EvalType.FUNCTION, desc, givenArgs);
@@ -44,27 +42,19 @@ public class GeneralFunctionEval extends FunctionEval {
     * @see nta.query.executor.eval.Expr#evalVal(Tuple)
     */
 	@Override
-	public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-    this.schema = schema;
-    this.tuple = tuple;
-	}
-
-  @Override
-  public Datum terminate(EvalContext ctx) {
-    FuncCallCtx localCtx = (FuncCallCtx) ctx;
+	public Datum eval(Schema schema, Tuple tuple) {
     if (this.params == null) {
       params = new VTuple(argEvals.length);
     }
-
     if(argEvals != null) {
       params.clear();
       for(int i=0;i < argEvals.length; i++) {
-        argEvals[i].eval(localCtx.argCtxs[i], schema, tuple);
-        params.put(i, argEvals[i].terminate(localCtx.argCtxs[i]));
+        params.put(i, argEvals[i].eval(schema, tuple));
       }
     }
+
     return instance.eval(params);
-  }
+	}
 	
 	@Override
 	public boolean equals(Object obj) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
index e113326..189e9dd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/InEval.java
@@ -25,7 +25,6 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.Tuple;
 
 public class InEval extends BinaryEval {
@@ -45,11 +44,6 @@ public class InEval extends BinaryEval {
   }
 
   @Override
-  public EvalContext newContext() {
-    return new InEvalCtx();
-  }
-
-  @Override
   public TajoDataTypes.DataType getValueType() {
     return RES_TYPE;
   }
@@ -60,37 +54,27 @@ public class InEval extends BinaryEval {
   }
 
   @Override
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-    InEvalCtx isNullCtx = (InEvalCtx) ctx;
+  public Datum eval(Schema schema, Tuple tuple) {
     if (fieldId == null) {
       fieldId = schema.getColumnId(((FieldEval)leftExpr).getColumnRef().getQualifiedName());
       values = ((RowConstantEval)rightExpr).getValues();
     }
 
-    boolean isIncluded = false;
-
     Datum value = tuple.get(fieldId);
 
     if (value.isNull()) {
-      isNullCtx.isNull = true;
-      return;
+      return value;
     }
 
+    boolean isIncluded = false;
     for (Datum datum : values) {
       if (value.equalsTo(datum).asBool()) {
         isIncluded = true;
         break;
       }
     }
-    isNullCtx.result = isIncluded;
-  }
 
-  @Override
-  public Datum terminate(EvalContext ctx) {
-    if (((InEvalCtx)ctx).isNull) {
-      return NullDatum.get();
-    }
-    return DatumFactory.createBool(not ^ ((InEvalCtx)ctx).result);
+    return DatumFactory.createBool(not ^ isIncluded);
   }
 
   @Override
@@ -105,9 +89,4 @@ public class InEval extends BinaryEval {
   public String toString() {
     return leftExpr + " IN (" + rightExpr + ")";
   }
-
-  private class InEvalCtx implements EvalContext {
-    boolean isNull;
-    boolean result;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
index 8bb4d95..5704aa5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/IsNullEval.java
@@ -23,7 +23,6 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.BooleanDatum;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.storage.Tuple;
@@ -42,13 +41,6 @@ public class IsNullEval extends BinaryEval {
   }
 
   @Override
-  public EvalContext newContext() {
-    IsNullEvalCtx context = new IsNullEvalCtx();
-    context.predicandContext = leftExpr.newContext();
-    return context;
-  }
-
-  @Override
   public DataType getValueType() {
     return RES_TYPE;
   }
@@ -64,16 +56,9 @@ public class IsNullEval extends BinaryEval {
   }
 
   @Override
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-    IsNullEvalCtx isNullCtx = (IsNullEvalCtx) ctx;
-    leftExpr.eval(isNullCtx.predicandContext, schema, tuple);
-    Datum result = leftExpr.terminate(((IsNullEvalCtx)ctx).predicandContext);
-    ((IsNullEvalCtx) ctx).result = DatumFactory.createBool(isNot ^ (result.type() == TajoDataTypes.Type.NULL_TYPE));
-  }
-
-  @Override
-  public Datum terminate(EvalContext ctx) {
-    return ((IsNullEvalCtx)ctx).result;
+  public Datum eval(Schema schema, Tuple tuple) {
+    boolean isNull = leftExpr.eval(schema, tuple).isNull();
+    return DatumFactory.createBool(isNot ^ isNull);
   }
 
   public boolean isNot() {
@@ -96,13 +81,4 @@ public class IsNullEval extends BinaryEval {
 
     return isNullEval;
   }
-
-  private class IsNullEvalCtx implements EvalContext {
-    EvalContext predicandContext;
-    BooleanDatum result;
-
-    IsNullEvalCtx() {
-      this.result = DatumFactory.createBool(false);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
index 677708d..1a16af4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/NotEval.java
@@ -36,13 +36,6 @@ public class NotEval extends EvalNode implements Cloneable {
     this.childEval = childEval;
   }
 
-  @Override
-  public EvalContext newContext() {
-    NotEvalCtx newCtx = new NotEvalCtx();
-    newCtx.childExprCtx = childEval.newContext();
-    return newCtx;
-  }
-
   public EvalNode getChild() {
     return childEval;
   }
@@ -58,13 +51,8 @@ public class NotEval extends EvalNode implements Cloneable {
   }
 
   @Override
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-    childEval.eval(((NotEvalCtx) ctx).childExprCtx, schema, tuple);
-  }
-
-  @Override
-  public Datum terminate(EvalContext ctx) {
-    Datum datum = childEval.terminate(((NotEvalCtx) ctx).childExprCtx);
+  public Datum eval(Schema schema, Tuple tuple) {
+    Datum datum = childEval.eval(schema, tuple);
     return !datum.isNull() ? DatumFactory.createBool(!datum.asBool()) : datum;
   }
 
@@ -101,8 +89,4 @@ public class NotEval extends EvalNode implements Cloneable {
     eval.childEval = (EvalNode) this.childEval.clone();
     return eval;
   }
-
-  private class NotEvalCtx implements EvalContext {
-    EvalContext childExprCtx;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
index 73a68cd..0f4411d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PartialBinaryExpr.java
@@ -34,11 +34,6 @@ public class PartialBinaryExpr extends EvalNode {
   }
 
   @Override
-  public EvalContext newContext() {
-    return null;
-  }
-
-  @Override
   public DataType getValueType() {
     return null;
   }
@@ -49,20 +44,13 @@ public class PartialBinaryExpr extends EvalNode {
   }
 
   @Override
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
+  public Datum eval(Schema schema, Tuple tuple) {
     throw new InvalidEvalException(
         "ERROR: the partial binary expression cannot be evluated: "
             + this.toString() );
   }
 
   @Override
-  public Datum terminate(EvalContext ctx) {
-    throw new InvalidEvalException(
-        "ERROR: the partial binary expression cannot be terminated: "
-            + this.toString() );
-  }
-
-  @Override
   public boolean equals(Object obj) {
     if (obj instanceof PartialBinaryExpr) {
       PartialBinaryExpr other = (PartialBinaryExpr) obj;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
index 568af0c..8d78b0b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/PatternMatchPredicateEval.java
@@ -39,8 +39,6 @@ public abstract class PatternMatchPredicateEval extends BinaryEval {
   @Expose protected boolean caseInsensitive;
 
   // transient variables
-  private EvalContext leftContext;
-  private boolean isNullResult = false;
   protected Pattern compiled;
 
   public PatternMatchPredicateEval(EvalType evalType, boolean not, EvalNode predicand, ConstEval pattern,
@@ -68,29 +66,17 @@ public abstract class PatternMatchPredicateEval extends BinaryEval {
   }
 
   @Override
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-    if (leftContext == null) {
-      leftContext = leftExpr.newContext();
+  public Datum eval(Schema schema, Tuple tuple) {
+    if (this.compiled == null) {
       compile(this.pattern);
     }
 
-    leftExpr.eval(leftContext, schema, tuple);
-    Datum predicand = leftExpr.terminate(leftContext);
-    isNullResult = predicand instanceof NullDatum;
-    boolean matched = compiled.matcher(predicand.asChars()).matches();
-    ((PatternMatchPredicateContext)ctx).result = matched ^ not;
-  }
-
-  public Datum terminate(EvalContext ctx) {
-    return !isNullResult ?
-        DatumFactory.createBool(((PatternMatchPredicateContext)ctx).result) : NullDatum.get();
-  }
-
-  public EvalContext newContext() {
-    return new PatternMatchPredicateContext();
-  }
+    Datum predicand = leftExpr.eval(schema, tuple);
+    if (predicand.isNull()) {
+      return NullDatum.get();
+    }
 
-  private class PatternMatchPredicateContext implements EvalContext {
-    public boolean result = false;
+    boolean matched = compiled.matcher(predicand.asChars()).matches();
+    return DatumFactory.createBool(matched ^ not);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
index 1fc6a21..646a627 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/RowConstantEval.java
@@ -20,7 +20,10 @@ package org.apache.tajo.engine.eval;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.util.TUtil;
 
 import static org.apache.tajo.common.TajoDataTypes.DataType;
@@ -34,11 +37,6 @@ public class RowConstantEval extends EvalNode {
   }
 
   @Override
-  public EvalContext newContext() {
-    return null;
-  }
-
-  @Override
   public DataType getValueType() {
     return CatalogUtil.newSimpleDataType(values[0].type());
   }
@@ -49,8 +47,12 @@ public class RowConstantEval extends EvalNode {
   }
 
   @Override
-  public Datum terminate(EvalContext ctx) {
-    return null;
+  public Datum eval(Schema schema, Tuple tuple) {
+    return NullDatum.get();
+  }
+
+  public Datum [] getValues() {
+    return values;
   }
 
   @Override
@@ -67,10 +69,6 @@ public class RowConstantEval extends EvalNode {
     return TUtil.arrayToString(values);
   }
 
-  public Datum [] getValues() {
-    return values;
-  }
-
   public void preOrder(EvalNodeVisitor visitor) {
     visitor.visit(this);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
index b0ba813..e0f50c1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/eval/SignedEval.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.eval;
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NumericDatum;
 import org.apache.tajo.storage.Tuple;
 
@@ -34,13 +35,6 @@ public class SignedEval extends EvalNode implements Cloneable {
     this.childEval = childEval;
   }
 
-  @Override
-  public EvalContext newContext() {
-    SignedEvalCtx newCtx = new SignedEvalCtx();
-    newCtx.childExprCtx = childEval.newContext();
-    return newCtx;
-  }
-
   public boolean isNegative() {
     return negative;
   }
@@ -60,13 +54,8 @@ public class SignedEval extends EvalNode implements Cloneable {
   }
 
   @Override
-  public void eval(EvalContext ctx, Schema schema, Tuple tuple) {
-    childEval.eval(((SignedEvalCtx) ctx).childExprCtx, schema, tuple);
-  }
-
-  @Override
-  public NumericDatum terminate(EvalContext ctx) {
-    NumericDatum result = childEval.terminate(((SignedEvalCtx) ctx).childExprCtx);
+  public Datum eval(Schema schema, Tuple tuple) {
+    NumericDatum result = childEval.eval(schema, tuple);
     if (negative) {
       result.inverseSign();
     }
@@ -107,8 +96,4 @@ public class SignedEval extends EvalNode implements Cloneable {
     eval.childEval = (EvalNode) this.childEval.clone();
     return eval;
   }
-
-  private class SignedEvalCtx implements EvalContext {
-    EvalContext childExprCtx;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
index a665730..c473ac1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/ExprsVerifier.java
@@ -46,6 +46,10 @@ public class ExprsVerifier extends BasicEvalNodeVisitor<VerificationState, EvalN
       return true;
     }
 
+    if (checkDateTime(dataType1) && checkDateTime(dataType2)) {
+      return true;
+    }
+
     return false;
   }
 
@@ -109,8 +113,7 @@ public class ExprsVerifier extends BasicEvalNodeVisitor<VerificationState, EvalN
   private static void checkArithmeticOperand(VerificationState state, BinaryEval evalNode) {
     EvalNode leftExpr = evalNode.getLeftExpr();
     EvalNode rightExpr = evalNode.getRightExpr();
-    if (!(checkNumericType(leftExpr.getValueType())
-        && checkNumericType(rightExpr.getValueType()))) {
+    if (!(checkNumericType(leftExpr.getValueType()) && checkNumericType(rightExpr.getValueType()))) {
       state.addVerification("No operator matches the given name and argument type(s): " + evalNode.toString());
     }
   }
@@ -125,6 +128,12 @@ public class ExprsVerifier extends BasicEvalNodeVisitor<VerificationState, EvalN
     return Type.CHAR.getNumber() < typeNumber && typeNumber <= Type.TEXT.getNumber();
   }
 
+  private static boolean checkDateTime(DataType dataType) {
+    int typeNumber = dataType.getType().getNumber();
+    return (Type.DATE.getNumber() < typeNumber && typeNumber <= Type.INTERVAL.getNumber()) ||
+        (Type.TIMEZ.getNumber() < typeNumber && typeNumber <= Type.TIMESTAMPZ.getNumber());
+  }
+
   @Override
   public EvalNode visitPlus(VerificationState state, BinaryEval evalNode, Stack<EvalNode> stack) {
     super.visitDivide(state, evalNode, stack);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
index ac76f57..9c58b7d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanVerifier.java
@@ -64,13 +64,6 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
   public LogicalNode visitGroupBy(VerificationState state, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                   GroupbyNode node, Stack<LogicalNode> stack) throws PlanningException {
     visit(state, plan, block, node.getChild(), stack);
-
-    if (node.hasTargets()) {
-      for (Target target : node.getTargets()) {
-        ExprsVerifier.verify(state, target.getEvalTree());
-      }
-    }
-
     return node;
   }
 
@@ -172,10 +165,6 @@ public class LogicalPlanVerifier extends BasicLogicalPlanVisitor<VerificationSta
                                  InsertNode node, Stack<LogicalNode> stack) throws PlanningException {
     LogicalNode child = visit(state, plan, block, node.getChild(), stack);
 
-//    if (node.hasTargetSchema()) {
-//      ensureDomains(state, node.getTargetSchema(), node.getChild().getOutSchema());
-//    }
-
     return child;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
index 8995226..066a3bb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/LogicalPlanner.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.engine.planner;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,16 +35,15 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.exception.InvalidQueryException;
 import org.apache.tajo.engine.exception.VerifyException;
 import org.apache.tajo.engine.planner.LogicalPlan.QueryBlock;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.util.TUtil;
 
 import java.util.*;
 
-import static org.apache.tajo.algebra.Aggregation.GroupType;
 import static org.apache.tajo.algebra.CreateTable.ColumnPartition;
 import static org.apache.tajo.algebra.CreateTable.PartitionType;
 import static org.apache.tajo.engine.planner.ExprNormalizer.ExprNormalizedResult;
@@ -225,6 +223,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     // It sets raw targets, all of them are raw expressions instead of references.
     setRawTargets(context, targets, referenceNames, projection);
 
+    verifyProjectedFields(block, projectionNode);
     return projectionNode;
   }
 
@@ -333,6 +332,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   private Target [] buildTargets(LogicalPlan plan, QueryBlock block, String[] referenceNames)
       throws PlanningException {
     Target [] targets = new Target[referenceNames.length];
+
     for (int i = 0; i < referenceNames.length; i++) {
       if (block.namedExprsMgr.isResolved(referenceNames[i])) {
         targets[i] = block.namedExprsMgr.getTarget(referenceNames[i]);
@@ -346,6 +346,44 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return targets;
   }
 
+  private static void verifyProjectedFields(QueryBlock block, Projectable projectable) throws PlanningException {
+    if (projectable instanceof ProjectionNode && block.hasNode(NodeType.GROUP_BY)) {
+      for (Target target : projectable.getTargets()) {
+        Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
+        for (Column c : columns) {
+          if (!projectable.getInSchema().contains(c)) {
+            throw new PlanningException(c.getQualifiedName()
+                + " must appear in the GROUP BY clause or be used in an aggregate function");
+          }
+        }
+      }
+    } else  if (projectable instanceof GroupbyNode) {
+      GroupbyNode groupbyNode = (GroupbyNode) projectable;
+      for (Column c : groupbyNode.getGroupingColumns()) {
+        if (!projectable.getInSchema().contains(c)) {
+          throw new PlanningException("Cannot get such a field: " + c);
+        }
+      }
+      for (AggregationFunctionCallEval f : groupbyNode.getAggFunctions()) {
+        Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(f);
+        for (Column c : columns) {
+          if (!projectable.getInSchema().contains(c)) {
+            throw new PlanningException("Cannot get such a field: " + c);
+          }
+        }
+      }
+    } else {
+      for (Target target : projectable.getTargets()) {
+        Set<Column> columns = EvalTreeUtil.findDistinctRefColumns(target.getEvalTree());
+        for (Column c : columns) {
+          if (!projectable.getInSchema().contains(c)) {
+            throw new PlanningException("Cannot get such a field: " + c);
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Insert a group-by operator before a sort or a projection operator.
    * It is used only when a group-by clause is not given.
@@ -358,22 +396,26 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     GroupbyNode groupbyNode = new GroupbyNode(plan.newPID());
     groupbyNode.setGroupingColumns(new Column[] {});
 
-    Set<Target> evaluatedTargets = new LinkedHashSet<Target>();
+    Set<String> aggEvalNames = new LinkedHashSet<String>();
+    Set<AggregationFunctionCallEval> aggEvals = new LinkedHashSet<AggregationFunctionCallEval>();
     boolean includeDistinctFunction = false;
     for (Iterator<NamedExpr> it = block.namedExprsMgr.getUnresolvedExprs(); it.hasNext();) {
       NamedExpr rawTarget = it.next();
       try {
         includeDistinctFunction = PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
         EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
-        if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
+        if (evalNode.getType() == EvalType.AGG_FUNCTION) {
+          aggEvalNames.add(rawTarget.getAlias());
+          aggEvals.add((AggregationFunctionCallEval) evalNode);
           block.namedExprsMgr.resolveExpr(rawTarget.getAlias(), evalNode);
-          evaluatedTargets.add(new Target(evalNode, rawTarget.getAlias()));
         }
       } catch (VerifyException ve) {
       }
     }
     groupbyNode.setDistinct(includeDistinctFunction);
-    groupbyNode.setTargets(evaluatedTargets.toArray(new Target[evaluatedTargets.size()]));
+    groupbyNode.setAggFunctions(aggEvals.toArray(new AggregationFunctionCallEval[aggEvals.size()]));
+    Target [] targets = ProjectionPushDownRule.buildGroupByTarget(groupbyNode, aggEvalNames.toArray(new String[aggEvalNames.size()]));
+    groupbyNode.setTargets(targets);
     groupbyNode.setChild(child);
     groupbyNode.setInSchema(child.getOutSchema());
 
@@ -429,7 +471,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     limitNode.setInSchema(child.getOutSchema());
     limitNode.setOutSchema(child.getOutSchema());
 
-    limitNode.setFetchFirst(firstFetNum.terminate(null).asInt8());
+    limitNode.setFetchFirst(firstFetNum.eval(null, null).asInt8());
 
     return limitNode;
   }
@@ -558,53 +600,64 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     groupingNode.setChild(child);
     groupingNode.setInSchema(child.getOutSchema());
 
+    // Set grouping sets
+    Column [] groupingColumns = new Column[aggregation.getGroupSet()[0].getGroupingSets().length];
+    for (int i = 0; i < groupingColumns.length; i++) {
+      if (block.namedExprsMgr.isResolved(groupingKeyRefNames[i])) {
+        groupingColumns[i] = block.namedExprsMgr.getTarget(groupingKeyRefNames[i]).getNamedColumn();
+      } else {
+        throw new PlanningException("Each grouping column expression must be a scalar expression.");
+      }
+    }
+    groupingNode.setGroupingColumns(groupingColumns);
+
+    ////////////////////////////////////////////////////////
+    // Visit and Build Child Plan
+    ////////////////////////////////////////////////////////
 
     // create EvalNodes and check if each EvalNode can be evaluated here.
-    Set<Target> evaluatedTargets = new LinkedHashSet<Target>();
+    List<String> aggEvalNames = TUtil.newList();
+    List<AggregationFunctionCallEval> aggEvalNodes = TUtil.newList();
     boolean includeDistinctFunction = false;
-    for (NamedExpr rawTarget : block.namedExprsMgr.getAllNamedExprs()) {
+    for (Iterator<NamedExpr> iterator = block.namedExprsMgr.getUnresolvedExprs(); iterator.hasNext();) {
+      NamedExpr namedExpr = iterator.next();
       try {
-        includeDistinctFunction = PlannerUtil.existsDistinctAggregationFunction(rawTarget.getExpr());
-        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, rawTarget.getExpr());
-        if (EvalTreeUtil.findDistinctAggFunction(evalNode).size() > 0) {
-          block.namedExprsMgr.resolveExpr(rawTarget.getAlias(), evalNode);
-          evaluatedTargets.add(new Target(evalNode, rawTarget.getAlias()));
+        includeDistinctFunction |= PlannerUtil.existsDistinctAggregationFunction(namedExpr.getExpr());
+        EvalNode evalNode = exprAnnotator.createEvalNode(context.plan, context.queryBlock, namedExpr.getExpr());
+        if (evalNode.getType() == EvalType.AGG_FUNCTION) {
+          block.namedExprsMgr.resolveExpr(namedExpr.getAlias(), evalNode);
+          aggEvalNames.add(namedExpr.getAlias());
+          aggEvalNodes.add((AggregationFunctionCallEval) evalNode);
         }
       } catch (VerifyException ve) {
       }
     }
-    // If there is at least one distinct aggregation function
+    // if there is at least one distinct aggregation function
     groupingNode.setDistinct(includeDistinctFunction);
+    groupingNode.setAggFunctions(aggEvalNodes.toArray(new AggregationFunctionCallEval[aggEvalNodes.size()]));
 
+    Target [] targets = new Target[groupingKeyNum + aggEvalNames.size()];
 
-    // Set grouping sets
-    List<Target> targets = new ArrayList<Target>();
-    Aggregation.GroupElement [] groupElements = aggregation.getGroupSet();
-
-    // Currently, single ordinary grouping set is only available.
-    if (groupElements[0].getType() == GroupType.OrdinaryGroup) {
-      Column [] groupingColumns = new Column[aggregation.getGroupSet()[0].getGroupingSets().length];
-      for (int i = 0; i < groupingColumns.length; i++) {
-        if (block.namedExprsMgr.isResolved(groupingKeyRefNames[i])) {
-          groupingColumns[i] = block.namedExprsMgr.getTarget(groupingKeyRefNames[i]).getNamedColumn();
-        } else {
-          throw new PlanningException("Each grouping column expression must be a scalar expression.");
-        }
-      }
+    // In target, grouping columns will be followed by aggregation evals.
+    //
+    // col1, col2, col3,   sum(..),  agv(..)
+    // ^^^^^^^^^^^^^^^    ^^^^^^^^^^^^^^^^^^
+    //  grouping keys      aggregation evals
 
-      for (Column column : groupingColumns) {
-        if (child.getOutSchema().contains(column)) {
-          targets.add(new Target(new FieldEval(child.getOutSchema().getColumn(column))));
-        }
-      }
-      groupingNode.setGroupingColumns(groupingColumns);
-    } else {
-      throw new InvalidQueryException("Not support grouping");
+    // Build grouping keys
+    for (int i = 0; i < groupingKeyNum; i++) {
+      Target target = new Target(new FieldEval(groupingNode.getGroupingColumns()[i]));
+      targets[i] = target;
     }
 
-    targets.addAll(evaluatedTargets);
-    groupingNode.setTargets(targets.toArray(new Target[targets.size()]));
+    for (int i = 0, targetIdx = groupingKeyNum; i < aggEvalNodes.size(); i++, targetIdx++) {
+      targets[targetIdx] = block.namedExprsMgr.getTarget(aggEvalNames.get(i));
+    }
+
+    groupingNode.setTargets(targets);
     block.unsetAggregationRequire();
+
+    verifyProjectedFields(block, groupingNode);
     return groupingNode;
   }
 
@@ -892,6 +945,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
     scanNode.setTargets(targets.toArray(new Target[targets.size()]));
 
+    verifyProjectedFields(block, scanNode);
     return scanNode;
   }
 
@@ -1487,25 +1541,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
   ===============================================================================================*/
 
   public static boolean checkIfBeEvaluatedAtGroupBy(EvalNode evalNode, GroupbyNode groupbyNode) {
-    Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
-
-    if (!groupbyNode.getInSchema().containsAll(columnRefs)) {
-      return false;
-    }
-
-    Set<String> tableIds = Sets.newHashSet();
-    // getting distinct table references
-    for (Column col : columnRefs) {
-      if (!tableIds.contains(col.getQualifier())) {
-        tableIds.add(col.getQualifier());
-      }
-    }
-
-    if (tableIds.size() > 1) {
-      return false;
-    }
-
-    return true;
+    return checkIfBeEvaluateAtThis(evalNode, groupbyNode) && evalNode.getType() == EvalType.AGG_FUNCTION;
   }
 
   public static boolean checkIfBeEvaluatedAtJoin(QueryBlock block, EvalNode evalNode, JoinNode joinNode,
@@ -1524,11 +1560,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     // at the topmost join operator.
     // TODO - It's also valid that case-when is evalauted at the topmost outer operator.
     //        But, how can we know there is no further outer join operator after this node?
-    if (checkCaseWhenWithOuterJoin(block, evalNode, isTopMostJoin)) {
-      return true;
-    } else {
-      return false;
-    }
+    return checkCaseWhenWithOuterJoin(block, evalNode, isTopMostJoin);
   }
 
   private static boolean checkCaseWhenWithOuterJoin(QueryBlock block, EvalNode evalNode, boolean isTopMostJoin) {
@@ -1564,9 +1596,6 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
 
   public static boolean checkIfBeEvaluateAtThis(EvalNode evalNode, LogicalNode node) {
     Set<Column> columnRefs = EvalTreeUtil.findDistinctRefColumns(evalNode);
-    if (!node.getInSchema().containsAll(columnRefs)) {
-      return false;
-    }
-    return true;
+    return node.getInSchema().containsAll(columnRefs);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index ae5f4fc..663954e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -20,9 +20,11 @@ package org.apache.tajo.engine.planner;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.google.common.collect.ObjectArrays;
 import com.google.common.collect.Sets;
-import org.apache.tajo.algebra.*;
+import org.apache.tajo.algebra.CountRowsFunctionExpr;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.algebra.GeneralSetFunctionExpr;
+import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -30,8 +32,8 @@ import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
+import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.util.TUtil;
 
@@ -209,94 +211,6 @@ public class PlannerUtil {
     parentNode.setChild(newNode);
   }
 
-  public static GroupbyNode transformGroupbyTo2P(GroupbyNode groupBy) {
-    Preconditions.checkNotNull(groupBy);
-
-    GroupbyNode child = null;
-
-    // cloning groupby node
-    try {
-      child = (GroupbyNode) groupBy.clone();
-    } catch (CloneNotSupportedException e) {
-      e.printStackTrace();
-    }
-
-    List<Target> firstStepTargets = Lists.newArrayList();
-    Target[] secondTargets = groupBy.getTargets();
-    Target[] firstTargets = child.getTargets();
-
-    Target second;
-    Target first;
-    int targetId =  0;
-    for (int i = 0; i < firstTargets.length; i++) {
-      second = secondTargets[i];
-      first = firstTargets[i];
-
-      List<AggregationFunctionCallEval> secondStepFunctions = EvalTreeUtil.findDistinctAggFunction(second.getEvalTree());
-      List<AggregationFunctionCallEval> firstStepFunctions = EvalTreeUtil.findDistinctAggFunction(first.getEvalTree());
-
-      if (firstStepFunctions.size() == 0) {
-        firstStepTargets.add(first);
-        targetId++;
-      } else {
-        for (AggregationFunctionCallEval func : firstStepFunctions) {
-          Target newTarget;
-
-          if (func.isDistinct()) {
-            List<Column> fields = EvalTreeUtil.findAllColumnRefs(func);
-            newTarget = new Target(new FieldEval(fields.get(0)));
-            String targetName = "column_" + (targetId++);
-            newTarget.setAlias(targetName);
-
-            AggregationFunctionCallEval secondFunc = null;
-            for (AggregationFunctionCallEval sf : secondStepFunctions) {
-              if (func.equals(sf)) {
-                secondFunc = sf;
-                break;
-              }
-            }
-
-            secondFunc.setArgs(new EvalNode [] {new FieldEval(
-                new Column(targetName, newTarget.getEvalTree().getValueType()))});
-          } else {
-            func.setFirstPhase();
-            String targetName = "column_" + (targetId++);
-            newTarget = new Target(func, targetName);
-            AggregationFunctionCallEval secondFunc = null;
-            for (AggregationFunctionCallEval sf : secondStepFunctions) {
-              if (func.equals(sf)) {
-                secondFunc = sf;
-                break;
-              }
-            }
-            secondFunc.setArgs(new EvalNode [] {new FieldEval(
-                new Column(targetName, newTarget.getEvalTree().getValueType()))});
-          }
-          firstStepTargets.add(newTarget);
-        }
-      }
-
-      // Getting new target list and updating input/output schema from the new target list.
-      Target[] targetArray = firstStepTargets.toArray(new Target[firstStepTargets.size()]);
-      Schema targetSchema = PlannerUtil.targetToSchema(targetArray);
-      List<Target> newTarget = Lists.newArrayList();
-      for (Column column : groupBy.getGroupingColumns()) {
-        if (!targetSchema.containsByQualifiedName(column.getQualifiedName())) {
-          newTarget.add(new Target(new FieldEval(column)));
-        }
-      }
-      targetArray = ObjectArrays.concat(targetArray, newTarget.toArray(new Target[newTarget.size()]), Target.class);
-
-      child.setTargets(targetArray);
-      // set the groupby chaining
-      groupBy.setChild(child);
-      groupBy.setInSchema(child.getOutSchema());
-
-    }
-    return child;
-  }
-
-  
   /**
    * Find the top logical node matched to type from the given node
    * 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
index 77e2a0b..1843b5a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/PreLogicalPlanVerifier.java
@@ -18,17 +18,68 @@
 
 package org.apache.tajo.engine.planner;
 
-import org.apache.tajo.algebra.Expr;
-import org.apache.tajo.algebra.Insert;
-import org.apache.tajo.algebra.OpType;
-import org.apache.tajo.algebra.Projection;
+import org.apache.tajo.algebra.*;
+import org.apache.tajo.catalog.CatalogService;
 
 import java.util.Stack;
 
 public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationState, Expr> {
+  private CatalogService catalog;
 
-  public Expr visitInsert(VerificationState ctx, Stack<Expr> stack, Insert expr) throws PlanningException {
-    Expr child = super.visitInsert(ctx, stack, expr);
+  public PreLogicalPlanVerifier(CatalogService catalog) {
+    this.catalog = catalog;
+  }
+
+  @Override
+  public Expr visitGroupBy(VerificationState ctx, Stack<Expr> stack, Aggregation expr) throws PlanningException {
+    Expr child = super.visitGroupBy(ctx, stack, expr);
+
+    // Enforcer only ordinary grouping set.
+    for (Aggregation.GroupElement groupingElement : expr.getGroupSet()) {
+      if (groupingElement.getType() != Aggregation.GroupType.OrdinaryGroup) {
+        ctx.addVerification(groupingElement.getType() + " is not supported yet");
+      }
+    }
+
+    Projection projection = null;
+    for (Expr parent : stack) {
+      if (parent.getType() == OpType.Projection) {
+        projection = (Projection) parent;
+        break;
+      }
+    }
+
+    if (projection == null) {
+      throw new PlanningException("No Projection");
+    }
+
+    return expr;
+  }
+
+  @Override
+  public Expr visitRelation(VerificationState state, Stack<Expr> stack, Relation expr) throws PlanningException {
+    checkRelationExistence(state, expr.getName());
+    return expr;
+  }
+
+  private boolean checkRelationExistence(VerificationState state, String name) {
+    if (!catalog.existsTable(name)) {
+      state.addVerification(String.format("relation \"%s\" does not exist", name));
+      return false;
+    }
+    return true;
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+  // Insert or Update Section
+  ///////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+  public Expr visitInsert(VerificationState context, Stack<Expr> stack, Insert expr) throws PlanningException {
+    Expr child = super.visitInsert(context, stack, expr);
+
+    if (expr.hasTableName()) {
+      checkRelationExistence(context, expr.getTableName());
+    }
 
     if (child != null && child.getType() == OpType.Projection) {
       if (expr.hasTargetColumns()) {
@@ -37,13 +88,13 @@ public class PreLogicalPlanVerifier extends BaseAlgebraVisitor <VerificationStat
         int targetColumnNum = expr.getTargetColumns().length;
 
         if (targetColumnNum > projectColumnNum)  {
-          ctx.addVerification("INSERT has more target columns than expressions");
+          context.addVerification("INSERT has more target columns than expressions");
         } else if (targetColumnNum < projectColumnNum) {
-          ctx.addVerification("INSERT has more expressions than target columns");
+          context.addVerification("INSERT has more expressions than target columns");
         }
       }
     }
 
-    return child;
+    return expr;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
index 8d6db22..161d39b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -19,7 +19,6 @@
 package org.apache.tajo.engine.planner;
 
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.storage.Tuple;
 
@@ -42,26 +41,11 @@ public class Projector {
     }
   }
 
-  public void eval(EvalContext[] evalContexts, Tuple in) {
+  public void eval(Tuple in, Tuple out) {
     if (targetNum > 0) {
       for (int i = 0; i < evals.length; i++) {
-        evals[i].eval(evalContexts[i], inSchema, in);
+        out.put(i, evals[i].eval(inSchema, in));
       }
     }
   }
-
-  public void terminate(EvalContext [] evalContexts, Tuple out) {
-    for (int i = 0; i < targetNum; i++) {
-      out.put(i, evals[i].terminate(evalContexts[i]));
-    }
-  }
-
-  public EvalContext [] newContexts() {
-    EvalContext [] evalContexts = new EvalContext[targetNum];
-    for (int i = 0; i < targetNum; i++) {
-      evalContexts[i] = evals[i].newContext();
-    }
-
-    return evalContexts;
-  }
 }