You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/01/25 07:35:02 UTC

[2/3] TAJO-539: Change some EvalNode::eval to directly return a Datum value.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 341a58a..3e756d5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -31,9 +31,12 @@ import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule;
 import org.apache.tajo.storage.AbstractStorageManager;
 
 import java.io.IOException;
@@ -212,15 +215,12 @@ public class GlobalPlanner {
     // setup current block
     ExecutionBlock currentBlock = context.plan.newExecutionBlock();
     LinkedHashSet<Column> columnsForDistinct = new LinkedHashSet<Column>();
-    for (Target target : groupbyNode.getTargets()) {
-      List<AggregationFunctionCallEval> functions = EvalTreeUtil.findDistinctAggFunction(target.getEvalTree());
-      for (AggregationFunctionCallEval function : functions) {
-        if (function.isDistinct()) {
-          columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
-        } else {
-          // See the comment of this method. the aggregation function should be executed as the first phase.
-          function.setFirstPhase();
-        }
+    for (AggregationFunctionCallEval function : groupbyNode.getAggFunctions()) {
+      if (function.isDistinct()) {
+        columnsForDistinct.addAll(EvalTreeUtil.findDistinctRefColumns(function));
+      } else {
+        // See the comment of this method. the aggregation function should be executed as the first phase.
+        function.setFirstPhase();
       }
     }
 
@@ -247,65 +247,140 @@ public class GlobalPlanner {
     return currentBlock;
   }
 
-  private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock childBlock,
+  private ExecutionBlock buildGroupBy(GlobalPlanContext context, ExecutionBlock lastBlock,
                                       GroupbyNode groupbyNode) throws PlanningException {
 
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;
 
-    if (groupbyNode.isDistinct()) {
-      return buildDistinctGroupBy(context, childBlock, groupbyNode);
+    if (groupbyNode.isDistinct()) { // if there is at one distinct aggregation function
+      return buildDistinctGroupBy(context, lastBlock, groupbyNode);
     } else {
-      GroupbyNode firstPhaseGroupBy = PlannerUtil.transformGroupbyTo2P(groupbyNode);
+      GroupbyNode firstPhaseGroupby = createFirstPhaseGroupBy(masterPlan.getLogicalPlan(), groupbyNode);
 
-      if (firstPhaseGroupBy.getChild().getType() == NodeType.TABLE_SUBQUERY &&
-          ((TableSubQueryNode)firstPhaseGroupBy.getChild()).getSubQuery().getType() == NodeType.UNION) {
+      if (hasUnionChild(firstPhaseGroupby)) {
+        currentBlock = buildGroupbyAndUnionPlan(masterPlan, lastBlock, firstPhaseGroupby, groupbyNode);
+      } else {
+        // general hash-shuffled aggregation
+        currentBlock = buildTwoPhaseGroupby(masterPlan, lastBlock, firstPhaseGroupby, groupbyNode);
+      }
+    }
 
-        currentBlock = childBlock;
-        for (DataChannel dataChannel : masterPlan.getIncomingChannels(currentBlock.getId())) {
-          if (firstPhaseGroupBy.isEmptyGrouping()) {
-            dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
-          } else {
-            dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
-          }
-          dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+    return currentBlock;
+  }
 
-          ExecutionBlock subBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
-          GroupbyNode g1 = PlannerUtil.clone(context.plan.getLogicalPlan(), firstPhaseGroupBy);
-          g1.setChild(subBlock.getPlan());
-          subBlock.setPlan(g1);
-
-          GroupbyNode g2 = PlannerUtil.clone(context.plan.getLogicalPlan(), groupbyNode);
-          ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), dataChannel);
-          g2.setChild(scanNode);
-          currentBlock.setPlan(g2);
-        }
-      } else { // general hash-shuffled aggregation
-        childBlock.setPlan(firstPhaseGroupBy);
-        currentBlock = masterPlan.newExecutionBlock();
+  public boolean hasUnionChild(UnaryNode node) {
 
-        DataChannel channel;
-        if (firstPhaseGroupBy.isEmptyGrouping()) {
-          channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 1);
-          channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
-        } else {
-          channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
-          channel.setShuffleKeys(firstPhaseGroupBy.getGroupingColumns());
-        }
-        channel.setSchema(firstPhaseGroupBy.getOutSchema());
-        channel.setStoreType(storeType);
+    if (node.getChild().getType() == NodeType.TABLE_SUBQUERY) {
+      TableSubQueryNode tableSubQuery = node.getChild();
+      return tableSubQuery.getSubQuery().getType() == NodeType.UNION;
+    }
+
+    return false;
+  }
 
-        ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
-        groupbyNode.setChild(scanNode);
-        groupbyNode.setInSchema(scanNode.getOutSchema());
-        currentBlock.setPlan(groupbyNode);
-        masterPlan.addConnect(channel);
+  private static ExecutionBlock buildGroupbyAndUnionPlan(MasterPlan masterPlan, ExecutionBlock lastBlock,
+                                                  GroupbyNode firstPhaseGroupBy, GroupbyNode secondPhaseGroupBy) {
+    DataChannel lastDataChannel = null;
+
+    // It pushes down the first phase group-by operator into all child blocks.
+    //
+    // (second phase)    G (currentBlock)
+    //                  /|\
+    //                / / | \
+    // (first phase) G G  G  G (child block)
+
+    // They are already connected one another.
+    // So, we don't need to connect them again.
+    for (DataChannel dataChannel : masterPlan.getIncomingChannels(lastBlock.getId())) {
+      if (firstPhaseGroupBy.isEmptyGrouping()) {
+        dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 1);
+      } else {
+        dataChannel.setShuffle(HASH_SHUFFLE, firstPhaseGroupBy.getGroupingColumns(), 32);
       }
+      dataChannel.setSchema(firstPhaseGroupBy.getOutSchema());
+      ExecutionBlock childBlock = masterPlan.getExecBlock(dataChannel.getSrcId());
+
+      // Why must firstPhaseGroupby be copied?
+      //
+      // A groupby in each execution block can have different child.
+      // It affects groupby's input schema.
+      GroupbyNode firstPhaseGroupbyCopy = PlannerUtil.clone(masterPlan.getLogicalPlan(), firstPhaseGroupBy);
+      firstPhaseGroupbyCopy.setChild(childBlock.getPlan());
+      childBlock.setPlan(firstPhaseGroupbyCopy);
+
+      // just keep the last data channel.
+      lastDataChannel = dataChannel;
+    }
+
+    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), lastDataChannel);
+    secondPhaseGroupBy.setChild(scanNode);
+    lastBlock.setPlan(secondPhaseGroupBy);
+    return lastBlock;
+  }
+
+  private ExecutionBlock buildTwoPhaseGroupby(MasterPlan masterPlan, ExecutionBlock latestBlock,
+                                                     GroupbyNode firstPhaseGroupby, GroupbyNode secondPhaseGroupby) {
+    ExecutionBlock childBlock = latestBlock;
+    childBlock.setPlan(firstPhaseGroupby);
+    ExecutionBlock currentBlock = masterPlan.newExecutionBlock();
+
+    DataChannel channel;
+    if (firstPhaseGroupby.isEmptyGrouping()) {
+      channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 1);
+      channel.setShuffleKeys(firstPhaseGroupby.getGroupingColumns());
+    } else {
+      channel = new DataChannel(childBlock, currentBlock, HASH_SHUFFLE, 32);
+      channel.setShuffleKeys(firstPhaseGroupby.getGroupingColumns());
     }
+    channel.setSchema(firstPhaseGroupby.getOutSchema());
+    channel.setStoreType(storeType);
+
+    ScanNode scanNode = buildInputExecutor(masterPlan.getLogicalPlan(), channel);
+    secondPhaseGroupby.setChild(scanNode);
+    secondPhaseGroupby.setInSchema(scanNode.getOutSchema());
+    currentBlock.setPlan(secondPhaseGroupby);
+
+    masterPlan.addConnect(channel);
 
     return currentBlock;
   }
 
+  public static GroupbyNode createFirstPhaseGroupBy(LogicalPlan plan, GroupbyNode groupBy) {
+    Preconditions.checkNotNull(groupBy);
+
+    GroupbyNode firstPhaseGroupBy = PlannerUtil.clone(plan, groupBy);
+    GroupbyNode secondPhaseGroupBy = groupBy;
+
+    // Set first phase expressions
+    if (secondPhaseGroupBy.hasAggFunctions()) {
+      int evalNum = secondPhaseGroupBy.getAggFunctions().length;
+      AggregationFunctionCallEval [] secondPhaseEvals = secondPhaseGroupBy.getAggFunctions();
+      AggregationFunctionCallEval [] firstPhaseEvals = new AggregationFunctionCallEval[evalNum];
+
+      String [] firstPhaseEvalNames = new String[evalNum];
+      for (int i = 0; i < evalNum; i++) {
+        try {
+          firstPhaseEvals[i] = (AggregationFunctionCallEval) secondPhaseEvals[i].clone();
+        } catch (CloneNotSupportedException e) {
+          throw new RuntimeException(e);
+        }
+
+        firstPhaseEvals[i].setFirstPhase();
+        firstPhaseEvalNames[i] = plan.newGeneratedFieldName(firstPhaseEvals[i]);
+        FieldEval param = new FieldEval(firstPhaseEvalNames[i], firstPhaseEvals[i].getValueType());
+        secondPhaseEvals[i].setArgs(new EvalNode[] {param});
+      }
+
+      secondPhaseGroupBy.setAggFunctions(secondPhaseEvals);
+      firstPhaseGroupBy.setAggFunctions(firstPhaseEvals);
+      Target [] firstPhaseTargets = ProjectionPushDownRule.buildGroupByTarget(firstPhaseGroupBy, firstPhaseEvalNames);
+      firstPhaseGroupBy.setTargets(firstPhaseTargets);
+      secondPhaseGroupBy.setInSchema(PlannerUtil.targetToSchema(firstPhaseTargets));
+    }
+    return firstPhaseGroupBy;
+  }
+
   private ExecutionBlock buildSortPlan(GlobalPlanContext context, ExecutionBlock childBlock, SortNode currentNode) {
     MasterPlan masterPlan = context.plan;
     ExecutionBlock currentBlock;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
index 41f1e88..8acd32e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/logical/GroupbyNode.java
@@ -20,14 +20,22 @@ package org.apache.tajo.engine.planner.logical;
 
 import com.google.gson.annotations.Expose;
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
 import org.apache.tajo.engine.planner.PlanString;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.util.TUtil;
 
 public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
-	@Expose private Column [] columns;
-	@Expose private Target [] targets;
+	/** Grouping key sets */
+  @Expose private Column [] groupingColumns;
+  /** Aggregation Functions */
+  @Expose private AggregationFunctionCallEval [] aggrFunctions;
+  /**
+   * It's a list of targets. The grouping columns should be followed by aggregation functions.
+   * aggrFunctions keep actual aggregation functions, but it only contains field references.
+   * */
+  @Expose private Target [] targets;
   @Expose private boolean hasDistinct = false;
 
   public GroupbyNode(int pid) {
@@ -35,15 +43,15 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   }
 
   public final boolean isEmptyGrouping() {
-    return columns == null || columns.length == 0;
+    return groupingColumns == null || groupingColumns.length == 0;
   }
 
   public void setGroupingColumns(Column [] groupingColumns) {
-    this.columns = groupingColumns;
+    this.groupingColumns = groupingColumns;
   }
 
 	public final Column [] getGroupingColumns() {
-	  return this.columns;
+	  return this.groupingColumns;
 	}
 
   public final boolean isDistinct() {
@@ -54,6 +62,18 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
     hasDistinct = distinct;
   }
 
+  public boolean hasAggFunctions() {
+    return this.aggrFunctions != null;
+  }
+
+  public AggregationFunctionCallEval [] getAggFunctions() {
+    return this.aggrFunctions;
+  }
+
+  public void setAggFunctions(AggregationFunctionCallEval[] evals) {
+    this.aggrFunctions = evals;
+  }
+
   @Override
   public boolean hasTargets() {
     return this.targets != null;
@@ -76,9 +96,9 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   
   public String toString() {
     StringBuilder sb = new StringBuilder("\"GroupBy\": {\"grouping fields\":[");
-    for (int i=0; i < columns.length; i++) {
-      sb.append("\"").append(columns[i]).append("\"");
-      if(i < columns.length - 1)
+    for (int i=0; i < groupingColumns.length; i++) {
+      sb.append("\"").append(groupingColumns[i]).append("\"");
+      if(i < groupingColumns.length - 1)
         sb.append(",");
     }
 
@@ -92,6 +112,9 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
       }
       sb.append("],");
     }
+    if (aggrFunctions != null) {
+      sb.append("\n  \"expr\": ").append(TUtil.arrayToString(aggrFunctions)).append(",");
+    }
     sb.append("\n  \"out schema\": ").append(getOutSchema()).append(",");
     sb.append("\n  \"in schema\": ").append(getInSchema());
     sb.append("}");
@@ -104,7 +127,8 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
     if (obj instanceof GroupbyNode) {
       GroupbyNode other = (GroupbyNode) obj;
       boolean eq = super.equals(other);
-      eq = eq && TUtil.checkEquals(columns, other.columns);
+      eq = eq && TUtil.checkEquals(groupingColumns, other.groupingColumns);
+      eq = eq && TUtil.checkEquals(aggrFunctions, other.aggrFunctions);
       eq = eq && TUtil.checkEquals(targets, other.targets);
       return eq;
     } else {
@@ -115,10 +139,17 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
   @Override
   public Object clone() throws CloneNotSupportedException {
     GroupbyNode grp = (GroupbyNode) super.clone();
-    if (columns != null) {
-      grp.columns = new Column[columns.length];
-      for (int i = 0; i < columns.length; i++) {
-        grp.columns[i] = (Column) columns[i].clone();
+    if (groupingColumns != null) {
+      grp.groupingColumns = new Column[groupingColumns.length];
+      for (int i = 0; i < groupingColumns.length; i++) {
+        grp.groupingColumns[i] = (Column) groupingColumns[i].clone();
+      }
+    }
+
+    if (aggrFunctions != null) {
+      grp.aggrFunctions = new AggregationFunctionCallEval[aggrFunctions.length];
+      for (int i = 0; i < aggrFunctions.length; i++) {
+        grp.aggrFunctions[i] = (AggregationFunctionCallEval) aggrFunctions[i].clone();
       }
     }
 
@@ -138,7 +169,7 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
 
     StringBuilder sb = new StringBuilder();
     sb.append("(");
-    Column [] groupingColumns = columns;
+    Column [] groupingColumns = this.groupingColumns;
     for (int j = 0; j < groupingColumns.length; j++) {
       sb.append(groupingColumns[j].getColumnName());
       if(j < groupingColumns.length - 1) {
@@ -150,6 +181,17 @@ public class GroupbyNode extends UnaryNode implements Projectable, Cloneable {
 
     planStr.appendTitle(sb.toString());
 
+    sb = new StringBuilder();
+    sb.append("(");
+    for (int j = 0; j < aggrFunctions.length; j++) {
+      sb.append(aggrFunctions[j]);
+      if(j < aggrFunctions.length - 1) {
+        sb.append(",");
+      }
+    }
+    sb.append(")");
+    planStr.appendExplain("exprs: ").appendExplain(sb.toString());
+
     sb = new StringBuilder("target list: ");
     for (int i = 0; i < targets.length; i++) {
       sb.append(targets[i]);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
index ed82cef..eb69703 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/AggregationExec.java
@@ -18,31 +18,22 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import com.google.common.collect.Sets;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.ConstEval;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
-import org.apache.tajo.engine.eval.EvalType;
-import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
-import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Set;
 
 public abstract class AggregationExec extends UnaryPhysicalExec {
   protected GroupbyNode plan;
 
-  protected Set<Column> nonNullGroupingFields;
-  protected int keylist [];
-  protected int measureList[];
-  protected final EvalNode evals [];
-  protected EvalContext evalContexts [];
+  protected final int groupingKeyNum;
+  protected int groupingKeyIds[];
+  protected final int aggFunctionsNum;
+  protected final AggregationFunctionCallEval aggFunctions[];
+
   protected Schema evalSchema;
 
   public AggregationExec(final TaskAttemptContext context, GroupbyNode plan,
@@ -52,47 +43,26 @@ public abstract class AggregationExec extends UnaryPhysicalExec {
 
     evalSchema = plan.getOutSchema();
 
-    nonNullGroupingFields = Sets.newHashSet();
-    // keylist will contain a list of IDs of grouping column
-    keylist = new int[plan.getGroupingColumns().length];
+    final Column [] keyColumns = plan.getGroupingColumns();
+    groupingKeyNum = keyColumns.length;
+    groupingKeyIds = new int[groupingKeyNum];
     Column col;
     for (int idx = 0; idx < plan.getGroupingColumns().length; idx++) {
-      col = plan.getGroupingColumns()[idx];
-      keylist[idx] = inSchema.getColumnId(col.getQualifiedName());
-      nonNullGroupingFields.add(col);
-    }
-
-    // measureList will contain a list of measure field indexes against the target list.
-    List<Integer> measureIndexes = TUtil.newList();
-    for (int i = 0; i < plan.getTargets().length; i++) {
-      Target target = plan.getTargets()[i];
-      if (target.getEvalTree().getType() == EvalType.AGG_FUNCTION) {
-        measureIndexes.add(i);
-      }
-    }
-
-    measureList = new int[measureIndexes.size()];
-    for (int i = 0; i < measureIndexes.size(); i++) {
-      measureList[i] = measureIndexes.get(i);
+      col = keyColumns[idx];
+      groupingKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
     }
 
-    evals = new EvalNode[plan.getTargets().length];
-    evalContexts = new EvalContext[plan.getTargets().length];
-    for (int i = 0; i < plan.getTargets().length; i++) {
-      Target t = plan.getTargets()[i];
-      if (t.getEvalTree().getType() == EvalType.FIELD && !nonNullGroupingFields.contains(t.getNamedColumn())) {
-        evals[i] = new ConstEval(DatumFactory.createNullDatum());
-        evalContexts[i] = evals[i].newContext();
-      } else {
-        evals[i] = t.getEvalTree();
-        evalContexts[i] = evals[i].newContext();
-      }
+    if (plan.hasAggFunctions()) {
+      aggFunctions = plan.getAggFunctions();
+      aggFunctionsNum = aggFunctions.length;
+    } else {
+      aggFunctions = new AggregationFunctionCallEval[0];
+      aggFunctionsNum = 0;
     }
   }
 
   @Override
   public void close() throws IOException {
     super.close();
-    nonNullGroupingFields.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index cf56200..b39a9f1 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -18,15 +18,14 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
-import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,7 +37,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
   private final JoinNode plan;
   private final boolean hasJoinQual;
   private final EvalNode joinQual;
-  private final EvalContext qualCtx;
 
   private final List<Tuple> leftTupleSlots;
   private final List<Tuple> rightTupleSlots;
@@ -58,7 +56,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
 
   // projection
   private final Projector projector;
-  private final EvalContext [] evalContexts;
 
   public BNLJoinExec(final TaskAttemptContext context, final JoinNode plan,
                      final PhysicalExec leftExec, PhysicalExec rightExec) {
@@ -67,10 +64,8 @@ public class BNLJoinExec extends BinaryPhysicalExec {
     this.joinQual = plan.getJoinQual();
     if (joinQual != null) { // if join type is not 'cross join'
       hasJoinQual = true;
-      this.qualCtx = this.joinQual.newContext();
     } else {
       hasJoinQual = false;
-      this.qualCtx = null;
     }
     this.leftTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
     this.rightTupleSlots = new ArrayList<Tuple>(TUPLE_SLOT_SIZE);
@@ -85,7 +80,6 @@ public class BNLJoinExec extends BinaryPhysicalExec {
     }
 
     projector = new Projector(inSchema, outSchema, plan.getTargets());
-    evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -191,15 +185,12 @@ public class BNLJoinExec extends BinaryPhysicalExec {
 
       frameTuple.set(leftTuple, rightIterator.next());
       if (hasJoinQual) {
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) {
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outputTuple);
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+          projector.eval(frameTuple, outputTuple);
           return outputTuple;
         }
       } else {
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outputTuple);
+        projector.eval(frameTuple, outputTuple);
         return outputTuple;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 30c1fa7..2ff6fc9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -19,16 +19,15 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
@@ -37,11 +36,9 @@ public class BSTIndexScanExec extends PhysicalExec {
   private SeekableScanner fileScanner;
   
   private EvalNode qual;
-  private EvalContext qualCtx;
   private BSTIndex.BSTIndexReader reader;
   
   private final Projector projector;
-  private EvalContext [] evalContexts;
   
   private Datum[] datum = null;
   
@@ -54,18 +51,12 @@ public class BSTIndexScanExec extends PhysicalExec {
     super(context, scanNode.getInSchema(), scanNode.getOutSchema());
     this.scanNode = scanNode;
     this.qual = scanNode.getQual();
-    if(this.qual == null) {
-      this.qualCtx = null;
-    } else {
-      this.qualCtx = this.qual.newContext();
-    }
     this.datum = datum;
 
     this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
         scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
     this.fileScanner.init();
     this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
-    this.evalContexts = projector.newContexts();
 
     this.reader = new BSTIndex(sm.getFileSystem().getConf()).
         getIndexReader(fileName, keySchema, comparator);
@@ -109,18 +100,15 @@ public class BSTIndexScanExec extends PhysicalExec {
     Tuple outTuple = new VTuple(this.outSchema.getColumnNum());
     if (!scanNode.hasQual()) {
       if ((tuple = fileScanner.next()) != null) {
-        projector.eval(evalContexts, tuple);
-        projector.terminate(evalContexts, outTuple);
+        projector.eval(tuple, outTuple);
         return outTuple;
       } else {
         return null;
       }
     } else {
-       while( reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
-         qual.eval(qualCtx, inSchema, tuple);
-         if (qual.terminate(qualCtx).asBool()) {
-           projector.eval(evalContexts, tuple);
-           projector.terminate(evalContexts, outTuple);
+       while(reader.isCurInMemory() && (tuple = fileScanner.next()) != null) {
+         if (qual.eval(inSchema, tuple).isTrue()) {
+           projector.eval(tuple, outTuple);
            return outTuple;
          } else {
            fileScanner.seek(reader.next());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
index 4480747..83580f9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/EvalExprExec.java
@@ -18,43 +18,32 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.EvalExprNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
 public class EvalExprExec extends PhysicalExec {
   private final EvalExprNode plan;
-  private final EvalContext[] evalContexts;
 
   public EvalExprExec(final TaskAttemptContext context, final EvalExprNode plan) {
     super(context, plan.getInSchema(), plan.getOutSchema());
     this.plan = plan;
-
-    evalContexts = new EvalContext[plan.getTargets().length];
-    for (int i = 0; i < plan.getTargets().length; i++) {
-      evalContexts[i] = plan.getTargets()[i].getEvalTree().newContext();
-    }
   }
 
   @Override
   public void init() throws IOException {
   }
 
-  /* (non-Javadoc)
-  * @see PhysicalExec#next()
-  */
   @Override
   public Tuple next() throws IOException {    
     Target [] targets = plan.getTargets();
     Tuple t = new VTuple(targets.length);
     for (int i = 0; i < targets.length; i++) {
-      targets[i].getEvalTree().eval(evalContexts[i], inSchema, null);
-      t.put(i, targets[i].getEvalTree().terminate(evalContexts[i]));
+      t.put(i, targets[i].getEvalTree().eval(inSchema, null));
     }
     return t;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
index ed2f275..d533b82 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashAggregateExec.java
@@ -18,7 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.eval.EvalContext;
+import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
@@ -35,44 +35,38 @@ import java.util.Map.Entry;
  */
 public class HashAggregateExec extends AggregationExec {
   private Tuple tuple = null;
-  private Map<Tuple, EvalContext[]> tupleSlots;
+  private Map<Tuple, FunctionContext[]> hashTable;
   private boolean computed = false;
-  private Iterator<Entry<Tuple, EvalContext []>> iterator = null;
+  private Iterator<Entry<Tuple, FunctionContext []>> iterator = null;
 
-  /**
-   * @throws java.io.IOException
-	 * 
-	 */
-  public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode annotation,
-                           PhysicalExec subOp) throws IOException {
-    super(ctx, annotation, subOp);
-    tupleSlots = new HashMap<Tuple, EvalContext[]>(10000);
-    this.tuple = new VTuple(evalSchema.getColumnNum());
+  public HashAggregateExec(TaskAttemptContext ctx, GroupbyNode plan, PhysicalExec subOp) throws IOException {
+    super(ctx, plan, subOp);
+    hashTable = new HashMap<Tuple, FunctionContext []>(100000);
+    this.tuple = new VTuple(plan.getOutSchema().getColumnNum());
   }
   
   private void compute() throws IOException {
     Tuple tuple;
     Tuple keyTuple;
-    int targetLength = plan.getTargets().length;
     while((tuple = child.next()) != null && !context.isStopped()) {
-      keyTuple = new VTuple(keylist.length);
+      keyTuple = new VTuple(groupingKeyIds.length);
       // build one key tuple
-      for(int i = 0; i < keylist.length; i++) {
-        keyTuple.put(i, tuple.get(keylist[i]));
+      for(int i = 0; i < groupingKeyIds.length; i++) {
+        keyTuple.put(i, tuple.get(groupingKeyIds[i]));
       }
       
-      if(tupleSlots.containsKey(keyTuple)) {
-        EvalContext [] tmpTuple = tupleSlots.get(keyTuple);
-        for(int i = 0; i < measureList.length; i++) {
-          evals[measureList[i]].eval(tmpTuple[measureList[i]], inSchema, tuple);
+      if(hashTable.containsKey(keyTuple)) {
+        FunctionContext [] contexts = hashTable.get(keyTuple);
+        for(int i = 0; i < aggFunctions.length; i++) {
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
         }
       } else { // if the key occurs firstly
-        EvalContext evalCtx [] = new EvalContext[targetLength];
-        for(int i = 0; i < targetLength; i++) {
-          evalCtx[i] = evals[i].newContext();
-          evals[i].eval(evalCtx[i], inSchema, tuple);
+        FunctionContext contexts [] = new FunctionContext[aggFunctionsNum];
+        for(int i = 0; i < aggFunctionsNum; i++) {
+          contexts[i] = aggFunctions[i].newContext();
+          aggFunctions[i].merge(contexts[i], inSchema, tuple);
         }
-        tupleSlots.put(keyTuple, evalCtx);
+        hashTable.put(keyTuple, contexts);
       }
     }
   }
@@ -81,16 +75,23 @@ public class HashAggregateExec extends AggregationExec {
   public Tuple next() throws IOException {
     if(!computed) {
       compute();
-      iterator = tupleSlots.entrySet().iterator();
+      iterator = hashTable.entrySet().iterator();
       computed = true;
     }
 
-    EvalContext [] ctx;
+    FunctionContext [] contexts;
 
     if (iterator.hasNext()) {
-      ctx =  iterator.next().getValue();
-      for (int i = 0; i < ctx.length; i++) {
-        tuple.put(i, evals[i].terminate(ctx[i]));
+      Entry<Tuple, FunctionContext []> entry = iterator.next();
+      Tuple keyTuple = entry.getKey();
+      contexts =  entry.getValue();
+
+      int tupleIdx = 0;
+      for (; tupleIdx < groupingKeyNum; tupleIdx++) {
+        tuple.put(tupleIdx, keyTuple.get(tupleIdx));
+      }
+      for (int funcIdx = 0; funcIdx < aggFunctionsNum; funcIdx++, tupleIdx++) {
+        tuple.put(tupleIdx, aggFunctions[funcIdx].terminate(contexts[funcIdx]));
       }
 
       return tuple;
@@ -101,12 +102,12 @@ public class HashAggregateExec extends AggregationExec {
 
   @Override
   public void rescan() throws IOException {    
-    iterator = tupleSlots.entrySet().iterator();
+    iterator = hashTable.entrySet().iterator();
   }
 
   @Override
   public void close() throws IOException {
     super.close();
-    tupleSlots.clear();
+    hashTable.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index caea5d9..848e362 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -18,9 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -30,6 +28,7 @@ import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.*;
@@ -48,7 +47,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
   protected Tuple outTuple = null;
   protected Map<Tuple, List<Tuple>> tupleSlots;
   protected Iterator<Tuple> iterator = null;
-  protected EvalContext qualCtx;
   protected Tuple leftTuple;
   protected Tuple leftKeyTuple;
 
@@ -60,7 +58,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
 
   // projection
   protected final Projector projector;
-  protected final EvalContext [] evalContexts;
 
   private int rightNumCols;
   private int leftNumCols;
@@ -72,7 +69,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
         plan.getOutSchema(), outer, inner);
     this.plan = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = joinQual.newContext();
     this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
 
     // this hashmap mirrors the evolution of the tupleSlots, with the same keys. For each join key,
@@ -95,7 +91,6 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -156,8 +151,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
           } else {
             Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, unmatchedRightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
 
             return outTuple;
           }
@@ -173,8 +167,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
           //output a tuple with the nulls padded rightTuple
           Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
           frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          projector.eval(frameTuple, outTuple);
           // we simulate we found a match, which is exactly the null padded one
           shouldGetLeftTuple = true;
           return outTuple;
@@ -184,10 +177,9 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
       // getting a next right tuple on in-memory hash table.
       rightTuple = iterator.next();
       frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      joinQual.eval(qualCtx, inSchema, frameTuple); //?? isn't it always true if hash function is identity function??
-      if (joinQual.terminate(qualCtx).isTrue()) { // if both tuples are joinable
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+        projector.eval(frameTuple, outTuple);
         found = true;
         getKeyLeftTuple(leftTuple, leftKeyTuple);
         matched.put(leftKeyTuple, true);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index e08e07d..08b7035 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -18,9 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -29,6 +27,7 @@ import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.*;
@@ -46,7 +45,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
   protected Tuple outTuple = null;
   protected Map<Tuple, List<Tuple>> tupleSlots;
   protected Iterator<Tuple> iterator = null;
-  protected EvalContext qualCtx;
   protected Tuple leftTuple;
   protected Tuple leftKeyTuple;
 
@@ -58,7 +56,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
 
   // projection
   protected final Projector projector;
-  protected final EvalContext [] evalContexts;
 
   public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec,
       PhysicalExec rightExec) {
@@ -66,7 +63,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
         leftExec, rightExec);
     this.plan = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = joinQual.newContext();
     this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
 
     this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual,
@@ -85,7 +81,6 @@ public class HashJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -131,10 +126,8 @@ public class HashJoinExec extends BinaryPhysicalExec {
       // getting a next right tuple on in-memory hash table.
       rightTuple = iterator.next();
       frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).asBool()) { // if both tuples are joinable
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+        projector.eval(frameTuple, outTuple);
         found = true;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
index 6b2d7b8..d7437c2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftAntiJoinExec.java
@@ -80,8 +80,7 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
       } else {
         // if not found, it returns a tuple.
         frameTuple.set(leftTuple, rightNullTuple);
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+        projector.eval(frameTuple, outTuple);
         return outTuple;
       }
 
@@ -91,16 +90,14 @@ public class HashLeftAntiJoinExec extends HashJoinExec {
       while (notFound && iterator.hasNext()) {
         rightTuple = iterator.next();
         frameTuple.set(leftTuple, rightTuple);
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
           notFound = false;
         }
       }
 
       if (notFound) { // if there is no matched tuple
         frameTuple.set(leftTuple, rightNullTuple);
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+        projector.eval(frameTuple, outTuple);
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index 841ff5a..d0c9897 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -18,9 +18,9 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.Column;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -30,13 +30,11 @@ import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.*;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 
 public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
   // from logical plan
@@ -51,7 +49,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
   protected Tuple outTuple = null;
   protected Map<Tuple, List<Tuple>> tupleSlots;
   protected Iterator<Tuple> iterator = null;
-  protected EvalContext qualCtx;
   protected Tuple leftTuple;
   protected Tuple leftKeyTuple;
 
@@ -63,7 +60,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
 
   // projection
   protected final Projector projector;
-  protected final EvalContext [] evalContexts;
 
   private int rightNumCols;
   private int leftNumCols;
@@ -75,7 +71,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
         plan.getOutSchema(), leftChild, rightChild);
     this.plan = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = joinQual.newContext();
     this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000);
 
     this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), rightChild.getSchema());
@@ -93,7 +88,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -137,8 +131,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
           // this left tuple doesn't have a match on the right, and output a tuple with the nulls padded rightTuple
           Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
           frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          projector.eval(frameTuple, outTuple);
           // we simulate we found a match, which is exactly the null padded one
           shouldGetLeftTuple = true;
           return outTuple;
@@ -148,10 +141,8 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
       // getting a next right tuple on in-memory hash table.
       rightTuple = iterator.next();
       frameTuple.set(leftTuple, rightTuple); // evaluate a join condition on both tuples
-      joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).isTrue()) { // if both tuples are joinable
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if both tuples are joinable
+        projector.eval(frameTuple, outTuple);
         found = true;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index c5e2d24..842c2e2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -89,11 +89,9 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
       while (notFound && iterator.hasNext()) {
         rightTuple = iterator.next();
         frameTuple.set(leftTuple, rightTuple);
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) { // if the matched one is found
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) { // if the matched one is found
           notFound = false;
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          projector.eval(frameTuple, outTuple);
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
index 41ff7bf..0418f65 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/HavingExec.java
@@ -18,21 +18,15 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.HavingNode;
-import org.apache.tajo.engine.planner.logical.SelectionNode;
-import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
 public class HavingExec extends UnaryPhysicalExec  {
   private final EvalNode qual;
-  private final EvalContext qualCtx;
-  private final Tuple outputTuple;
 
   public HavingExec(TaskAttemptContext context,
                     HavingNode plan,
@@ -40,17 +34,14 @@ public class HavingExec extends UnaryPhysicalExec  {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
 
     this.qual = plan.getQual();
-    this.qualCtx = this.qual.newContext();
-    this.outputTuple = new VTuple(outSchema.getColumnNum());
   }
 
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
     while ((tuple = child.next()) != null) {
-      qual.eval(qualCtx, inSchema, tuple);
-      if (qual.terminate(qualCtx).asBool()) {
-          return tuple;
+      if (qual.eval(inSchema, tuple).isTrue()) {
+        return tuple;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index 446755d..1d6da3f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -19,9 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -31,6 +29,7 @@ import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -41,7 +40,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
   // from logical plan
   private JoinNode joinNode;
   private EvalNode joinQual;
-  private EvalContext qualCtx;
 
   // temporal tuples and states for nested loop join
   private FrameTuple frameTuple;
@@ -62,7 +60,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
 
   // projection
   private final Projector projector;
-  private final EvalContext [] evalContexts;
 
   private int rightNumCols;
   private int leftNumCols;
@@ -78,7 +75,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
         "but there is no join condition");
     this.joinNode = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = this.joinQual.newContext();
 
     this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
     this.rightTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -93,7 +89,6 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -148,8 +143,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
             // output a tuple with the nulls padded leftTuple
             Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
             // we simulate we found a match, which is exactly the null padded one
             rightTuple = rightChild.next();
             return outTuple;
@@ -159,8 +153,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
             // output a tuple with the nulls padded leftTuple
             Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
             frameTuple.set(leftTuple, nullPaddedTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
             // we simulate we found a match, which is exactly the null padded one
             leftTuple = leftChild.next();
             return outTuple;
@@ -206,8 +199,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
             //output a tuple with the nulls padded leftTuple
             Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
             // BEFORE RETURN, MOVE FORWARD
             rightTuple = rightChild.next();
             if(rightTuple == null) {
@@ -221,8 +213,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
             // output a tuple with the nulls padded rightTuple
             Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
             frameTuple.set(leftTuple, nullPaddedTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
             // we simulate we found a match, which is exactly the null padded one
             // BEFORE RETURN, MOVE FORWARD
             leftTuple = leftChild.next();
@@ -297,9 +288,8 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
           Tuple aTuple = new VTuple(rightTupleSlots.get(posRightTupleSlots));
           posRightTupleSlots = posRightTupleSlots + 1;
           frameTuple.set(leftNext, aTuple);
-          joinQual.eval(qualCtx, inSchema, frameTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          joinQual.eval(inSchema, frameTuple);
+          projector.eval(frameTuple, outTuple);
           return outTuple;
         } else {
           // right (inner) slots reached end and should be rewind if there are still tuples in the outer slots
@@ -312,9 +302,8 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
             posLeftTupleSlots = posLeftTupleSlots + 1;
 
             frameTuple.set(leftNext, aTuple);
-            joinQual.eval(qualCtx, inSchema, frameTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            joinQual.eval(inSchema, frameTuple);
+            projector.eval(frameTuple, outTuple);
             return outTuple;
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index e128fea..e1d377e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -19,9 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -30,6 +28,7 @@ import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,7 +39,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
   // from logical plan
   private JoinNode joinNode;
   private EvalNode joinQual;
-  private EvalContext qualCtx;
 
   // temporal tuples and states for nested loop join
   private FrameTuple frameTuple;
@@ -63,7 +61,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
 
   // projection
   private final Projector projector;
-  private final EvalContext [] evalContexts;
 
   public MergeJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
       PhysicalExec inner, SortSpec[] outerSortKey, SortSpec[] innerSortKey) {
@@ -72,7 +69,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
         "but there is no join condition");
     this.joinNode = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = this.joinQual.newContext();
 
     this.outerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
     this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -89,7 +85,6 @@ public class MergeJoinExec extends BinaryPhysicalExec {
     
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -165,10 +160,9 @@ public class MergeJoinExec extends BinaryPhysicalExec {
       }
 
       frameTuple.set(outerNext, innerIterator.next());
-      joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).asBool()) {
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+        projector.eval(frameTuple, outTuple);
         return outTuple;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 055c66e..961be93 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -18,14 +18,13 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
@@ -41,10 +40,8 @@ public class NLJoinExec extends BinaryPhysicalExec {
   private Tuple outerTuple = null;
   private Tuple innerTuple = null;
   private Tuple outTuple = null;
-  private EvalContext qualCtx;
 
   // projection
-  private final EvalContext [] evalContexts;
   private final Projector projector;
 
   public NLJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer,
@@ -54,12 +51,10 @@ public class NLJoinExec extends BinaryPhysicalExec {
 
     if (plan.hasJoinQual()) {
       this.joinQual = plan.getJoinQual();
-      this.qualCtx = this.joinQual.newContext();
     }
 
     // for projection
     projector = new Projector(inSchema, outSchema, plan.getTargets());
-    evalContexts = projector.newContexts();
 
     // for join
     needNewOuter = true;
@@ -90,15 +85,12 @@ public class NLJoinExec extends BinaryPhysicalExec {
 
       frameTuple.set(outerTuple, innerTuple);
       if (joinQual != null) {
-        joinQual.eval(qualCtx, inSchema, frameTuple);
-        if (joinQual.terminate(qualCtx).asBool()) {
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+        if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+          projector.eval(frameTuple, outTuple);
           return outTuple;
         }
       } else {
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+        projector.eval(frameTuple, outTuple);
         return outTuple;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index 305cdd2..4abe570 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -18,8 +18,6 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.logical.JoinNode;
@@ -27,6 +25,7 @@ import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
@@ -41,10 +40,8 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
   private Tuple leftTuple = null;
   private Tuple rightTuple = null;
   private Tuple outTuple = null;
-  private EvalContext qualCtx;
 
   // projection
-  private final EvalContext [] evalContexts;
   private final Projector projector;
 
   private boolean foundAtLeastOneMatch;
@@ -57,12 +54,10 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
 
     if (plan.hasJoinQual()) {
       this.joinQual = plan.getJoinQual();
-      this.qualCtx = this.joinQual.newContext();
     }
 
     // for projection
     projector = new Projector(inSchema, outSchema, plan.getTargets());
-    evalContexts = projector.newContexts();
 
     // for join
     needNextRightTuple = true;
@@ -96,8 +91,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
           //output a tuple with the nulls padded rightTuple
           Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
           frameTuple.set(leftTuple, nullPaddedTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          projector.eval(frameTuple, outTuple);
           // we simulate we found a match, which is exactly the null padded one
           foundAtLeastOneMatch = true;
           needNextRightTuple = true;
@@ -111,10 +105,9 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
       }
 
       frameTuple.set(leftTuple, rightTuple);
-      joinQual.eval(qualCtx, inSchema, frameTuple);
-      if (joinQual.terminate(qualCtx).isTrue()) {
-        projector.eval(evalContexts, frameTuple);
-        projector.terminate(evalContexts, outTuple);
+      ;
+      if (joinQual.eval(inSchema, frameTuple).isTrue()) {
+        projector.eval(frameTuple, outTuple);
         foundAtLeastOneMatch = true;
         return outTuple;
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index c57089e..ecc6dd0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -21,12 +21,11 @@
  */
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.planner.logical.Projectable;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.logical.Projectable;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
@@ -35,7 +34,6 @@ public class ProjectionExec extends UnaryPhysicalExec {
 
   // for projection
   private Tuple outTuple;
-  private EvalContext[] evalContexts;
   private Projector projector;
   
   public ProjectionExec(TaskAttemptContext context, Projectable plan,
@@ -49,7 +47,6 @@ public class ProjectionExec extends UnaryPhysicalExec {
 
     this.outTuple = new VTuple(outSchema.getColumnNum());
     this.projector = new Projector(inSchema, outSchema, this.plan.getTargets());
-    this.evalContexts = projector.newContexts();
   }
 
   @Override
@@ -60,8 +57,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
       return null;
     }
 
-    projector.eval(evalContexts, tuple);
-    projector.terminate(evalContexts, outTuple);
+    projector.eval(tuple, outTuple);
     return outTuple;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 2f1d33d..365faba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -19,10 +19,8 @@
 package org.apache.tajo.engine.planner.physical;
 
 import com.google.common.base.Preconditions;
-import org.apache.tajo.worker.TaskAttemptContext;
 import org.apache.tajo.catalog.SortSpec;
 import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -31,6 +29,7 @@ import org.apache.tajo.storage.FrameTuple;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,7 +39,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
   // from logical plan
   private JoinNode joinNode;
   private EvalNode joinQual;
-  private EvalContext qualCtx;
 
   // temporal tuples and states for nested loop join
   private FrameTuple frameTuple;
@@ -61,7 +59,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
 
   // projection
   private final Projector projector;
-  private final EvalContext [] evalContexts;
 
   private int rightNumCols;
   private int leftNumCols;
@@ -77,7 +74,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
         "but there is no join condition");
     this.joinNode = plan;
     this.joinQual = plan.getJoinQual();
-    this.qualCtx = this.joinQual.newContext();
 
     this.leftTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
     this.innerTupleSlots = new ArrayList<Tuple>(INITIAL_TUPLE_SLOT);
@@ -91,7 +87,6 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
 
     // for projection
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     // for join
     frameTuple = new FrameTuple();
@@ -158,8 +153,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
             // output a tuple with the nulls padded leftTuple
             Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
 
             // we simulate we found a match, which is exactly the null padded one
             rightTuple = rightChild.next();
@@ -225,8 +219,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
             // output a tuple with the nulls padded left tuple
             Tuple nullPaddedTuple = createNullPaddedTuple(leftNumCols);
             frameTuple.set(nullPaddedTuple, rightTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            projector.eval(frameTuple, outTuple);
 
             // we simulate we found a match, which is exactly the null padded one
             // BEFORE RETURN, MOVE FORWARD
@@ -305,9 +298,8 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
           posRightTupleSlots = posRightTupleSlots + 1;
 
           frameTuple.set(nextLeft, aTuple);
-          joinQual.eval(qualCtx, inSchema, frameTuple);
-          projector.eval(evalContexts, frameTuple);
-          projector.terminate(evalContexts, outTuple);
+          joinQual.eval(inSchema, frameTuple);
+          projector.eval(frameTuple, outTuple);
           return outTuple;
 
         } else {
@@ -321,9 +313,8 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
             posLeftTupleSlots = posLeftTupleSlots + 1;
 
             frameTuple.set(nextLeft, aTuple);
-            joinQual.eval(qualCtx, inSchema, frameTuple);
-            projector.eval(evalContexts, frameTuple);
-            projector.terminate(evalContexts, outTuple);
+            joinQual.eval(inSchema, frameTuple);
+            projector.eval(frameTuple, outTuple);
             return outTuple;
           }
         }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 5158dc0..2e676e9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.SelectionNode;
 import org.apache.tajo.storage.Tuple;
@@ -28,23 +27,20 @@ import java.io.IOException;
 
 public class SelectionExec extends UnaryPhysicalExec  {
   private final EvalNode qual;
-  private final EvalContext qualCtx;
 
   public SelectionExec(TaskAttemptContext context,
                        SelectionNode plan,
                        PhysicalExec child) {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.qual = plan.getQual();
-    this.qualCtx = this.qual.newContext();
   }
 
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
     while ((tuple = child.next()) != null) {
-      qual.eval(qualCtx, inSchema, tuple);
-      if (qual.terminate(qualCtx).isTrue()) {
-          return tuple;
+      if (qual.eval(inSchema, tuple).isTrue()) {
+        return tuple;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 6d3d6b1..c38be92 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -18,21 +18,24 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.engine.eval.*;
-import org.apache.tajo.engine.utils.TupleUtil;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.FragmentConvertor;
-import org.apache.tajo.worker.TaskAttemptContext;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.eval.ConstEval;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
+import org.apache.tajo.engine.eval.FieldEval;
+import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
 import org.apache.tajo.engine.planner.Target;
-import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -46,12 +49,10 @@ public class SeqScanExec extends PhysicalExec {
   private Scanner scanner = null;
 
   private EvalNode qual = null;
-  private EvalContext qualCtx;
 
   private CatalogProtos.FragmentProto [] fragments;
 
   private Projector projector;
-  private EvalContext [] evalContexts;
 
   public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
                      ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
@@ -60,12 +61,6 @@ public class SeqScanExec extends PhysicalExec {
     this.plan = plan;
     this.qual = plan.getQual();
     this.fragments = fragments;
-
-    if (qual == null) {
-      qualCtx = null;
-    } else {
-      qualCtx = this.qual.newContext();
-    }
   }
 
   /**
@@ -100,9 +95,7 @@ public class SeqScanExec extends PhysicalExec {
     // However, actual values absent in tuples. So, Replace all column references by constant datum.
     for (Column column : columnPartitionSchema.toArray()) {
       FieldEval targetExpr = new FieldEval(column);
-      EvalContext evalContext = targetExpr.newContext();
-      targetExpr.eval(evalContext, columnPartitionSchema, partitionRow);
-      Datum datum = targetExpr.terminate(evalContext);
+      Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow);
       ConstEval constExpr = new ConstEval(datum);
       for (Target target : plan.getTargets()) {
         if (target.getEvalTree().equals(targetExpr)) {
@@ -151,7 +144,6 @@ public class SeqScanExec extends PhysicalExec {
     }
 
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
-    this.evalContexts = projector.newContexts();
 
     if (fragments.length > 1) {
       this.scanner = new MergeScanner(context.getConf(), plan.getTableSchema(), plan.getTableDesc().getMeta(),
@@ -172,8 +164,7 @@ public class SeqScanExec extends PhysicalExec {
 
     if (!plan.hasQual()) {
       if ((tuple = scanner.next()) != null) {
-        projector.eval(evalContexts, tuple);
-        projector.terminate(evalContexts, outTuple);
+        projector.eval(tuple, outTuple);
         outTuple.setOffset(tuple.getOffset());
         return outTuple;
       } else {
@@ -181,10 +172,9 @@ public class SeqScanExec extends PhysicalExec {
       }
     } else {
       while ((tuple = scanner.next()) != null) {
-        qual.eval(qualCtx, inSchema, tuple);
-        if (qual.terminate(qualCtx).isTrue()) {
-          projector.eval(evalContexts, tuple);
-          projector.terminate(evalContexts, outTuple);
+
+        if (qual.eval(inSchema, tuple).isTrue()) {
+          projector.eval(tuple, outTuple);
           return outTuple;
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
index af1bf34..dbe45dc 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortAggregateExec.java
@@ -18,82 +18,108 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.tajo.worker.TaskAttemptContext;
+import org.apache.tajo.engine.function.FunctionContext;
 import org.apache.tajo.engine.planner.logical.GroupbyNode;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
 
 /**
- * This is the sort-based Aggregation Operator.
+ * This is the sort-based aggregation operator.
+ *
+ * <h3>Implementation</h3>
+ * Sort Aggregation has two states while running.
+ *
+ * <h4>Aggregate state</h4>
+ * If lastkey is null or lastkey is equivalent to the current key, sort aggregation is changed to this state.
+ * In this state, this operator aggregates measure values via aggregation functions.
+ *
+ * <h4>Finalize state</h4>
+ * If currentKey is different from the last key, it computes final aggregation results, and then
+ * it makes an output tuple.
  */
 public class SortAggregateExec extends AggregationExec {
-  private Tuple prevKey = null;
+  private Tuple lastKey = null;
   private boolean finished = false;
+  private FunctionContext contexts[];
 
-
-  public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan,
-                           PhysicalExec child) throws IOException {
+  public SortAggregateExec(TaskAttemptContext context, GroupbyNode plan, PhysicalExec child) throws IOException {
     super(context, plan, child);
+    contexts = new FunctionContext[plan.getAggFunctions().length];
   }
 
   @Override
   public Tuple next() throws IOException {
-    Tuple curKey;
+    Tuple currentKey;
     Tuple tuple;
-    Tuple finalTuple = null;
+    Tuple outputTuple = null;
+
     while(!context.isStopped() && (tuple = child.next()) != null) {
-      // build a key tuple
-      curKey = new VTuple(keylist.length);
-      for(int i = 0; i < keylist.length; i++) {
-        curKey.put(i, tuple.get(keylist[i]));
+
+      // get a key tuple
+      currentKey = new VTuple(groupingKeyIds.length);
+      for(int i = 0; i < groupingKeyIds.length; i++) {
+        currentKey.put(i, tuple.get(groupingKeyIds[i]));
       }
 
-      if (prevKey == null || prevKey.equals(curKey)) {
-        if (prevKey == null) {
-          for(int i = 0; i < outSchema.getColumnNum(); i++) {
-            evalContexts[i] = evals[i].newContext();
-            evals[i].eval(evalContexts[i], inSchema, tuple);
+      /** Aggregation State */
+      if (lastKey == null || lastKey.equals(currentKey)) {
+        if (lastKey == null) {
+          for(int i = 0; i < aggFunctionsNum; i++) {
+            contexts[i] = aggFunctions[i].newContext();
+            aggFunctions[i].merge(contexts[i], inSchema, tuple);
           }
-          prevKey = curKey;
+          lastKey = currentKey;
         } else {
           // aggregate
-          for (int idx : measureList) {
-            evals[idx].eval(evalContexts[idx], inSchema, tuple);
+          for (int i = 0; i < aggFunctionsNum; i++) {
+            aggFunctions[i].merge(contexts[i], inSchema, tuple);
           }
         }
-      } else {
+
+      } else { /** Finalization State */
         // finalize aggregate and return
-        finalTuple = new VTuple(outSchema.getColumnNum());
-        for(int i = 0; i < outSchema.getColumnNum(); i++) {
-          finalTuple.put(i, evals[i].terminate(evalContexts[i]));
+        outputTuple = new VTuple(outSchema.getColumnNum());
+        int tupleIdx = 0;
+
+        for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+          outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+        }
+        for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+          outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
         }
 
-        for(int i = 0; i < outSchema.getColumnNum(); i++) {
-          evalContexts[i] = evals[i].newContext();
-          evals[i].eval(evalContexts[i], inSchema, tuple);
+        for(int evalIdx = 0; evalIdx < aggFunctionsNum; evalIdx++) {
+          contexts[evalIdx] = aggFunctions[evalIdx].newContext();
+          aggFunctions[evalIdx].merge(contexts[evalIdx], inSchema, tuple);
         }
-        prevKey = curKey;
-        return finalTuple;
+
+        lastKey = currentKey;
+        return outputTuple;
       }
     } // while loop
 
     if (!finished) {
-      finalTuple = new VTuple(outSchema.getColumnNum());
-      for(int i = 0; i < outSchema.getColumnNum(); i++) {
-        finalTuple.put(i, evals[i].terminate(evalContexts[i]));
+      outputTuple = new VTuple(outSchema.getColumnNum());
+      int tupleIdx = 0;
+      for(; tupleIdx < groupingKeyNum; tupleIdx++) {
+        outputTuple.put(tupleIdx, lastKey.get(tupleIdx));
+      }
+      for(int aggFuncIdx = 0; aggFuncIdx < aggFunctionsNum; tupleIdx++, aggFuncIdx++) {
+        outputTuple.put(tupleIdx, aggFunctions[aggFuncIdx].terminate(contexts[aggFuncIdx]));
       }
       finished = true;
     }
-    return finalTuple;
+    return outputTuple;
   }
 
   @Override
   public void rescan() throws IOException {
     super.rescan();
 
-    prevKey = null;
+    lastKey = null;
     finished = false;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/e23e78cc/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
index 72ff67c..041220a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/rewrite/PartitionedTableRewriter.java
@@ -95,16 +95,13 @@ public class PartitionedTableRewriter implements RewriteRule {
   }
 
   private static class PartitionPathFilter implements PathFilter {
-    private FileSystem fs;
     private Schema schema;
     private EvalNode partitionFilter;
-    private EvalContext evalContext;
 
 
     public PartitionPathFilter(Schema schema, EvalNode partitionFilter) {
       this.schema = schema;
       this.partitionFilter = partitionFilter;
-      evalContext = partitionFilter.newContext();
     }
 
     @Override
@@ -113,8 +110,8 @@ public class PartitionedTableRewriter implements RewriteRule {
       if (tuple == null) { // if it is a file or not acceptable file
         return false;
       }
-      partitionFilter.eval(evalContext, schema, tuple);
-      return partitionFilter.terminate(evalContext).asBool();
+
+      return partitionFilter.eval(schema, tuple).asBool();
     }
 
     @Override