You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/08/23 19:38:12 UTC

[23/25] TAJO-906: Runtime code generation for evaluating expression trees.

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java
index 9ac0e62..a690759 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimilarToPredicateEval.java
@@ -24,6 +24,11 @@ import java.util.regex.PatternSyntaxException;
 public class SimilarToPredicateEval extends PatternMatchPredicateEval {
   private static final String SIMILARTO_ESCAPE_SPATIAL_CHARACTERS = "([.])";
 
+  public SimilarToPredicateEval(boolean not, EvalNode field, ConstEval pattern,
+                                @SuppressWarnings("unused") boolean isCaseSensitive) {
+    super(EvalType.SIMILAR_TO, not, field, pattern, false);
+  }
+
   public SimilarToPredicateEval(boolean not, EvalNode field, ConstEval pattern) {
     super(EvalType.SIMILAR_TO, not, field, pattern);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
index 15e34de..15b628b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/SimpleEvalNodeVisitor.java
@@ -60,9 +60,6 @@ public abstract class SimpleEvalNodeVisitor<CONTEXT> {
       case IF_THEN:
         result = visitIfThen(context, (CaseWhenEval.IfThenEval) evalNode, stack);
         break;
-      case IN:
-        result = visitInPredicate(context, (InEval) evalNode, stack);
-        break;
 
       // Functions
       case FUNCTION:

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java
index 8c714c5..05640c9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceBoolean.java
@@ -32,12 +32,12 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
     example = "> SELECT coalesce(null, null, true);\n"
         + "true",
     returnType = Type.BOOLEAN,
-    paramTypes = {@ParamTypes(paramTypes = {Type.BOOLEAN, TajoDataTypes.Type.BOOLEAN_ARRAY})}
+    paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.BOOLEAN, TajoDataTypes.Type.BOOLEAN_ARRAY})}
 )
 public class CoalesceBoolean extends Coalesce {
   public CoalesceBoolean() {
     super(new Column[] {
-        new Column("column", TajoDataTypes.Type.BOOLEAN),
+        new Column("param", TajoDataTypes.Type.BOOLEAN),
         new Column("params", TajoDataTypes.Type.BOOLEAN_ARRAY),
     });
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java
index 23f8f0c..35df518 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDate.java
@@ -32,12 +32,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
     example = "> SELECT coalesce(null, null, date '2014-01-01');\n"
         + "2014-01-01",
     returnType = Type.DATE,
-    paramTypes = {@ParamTypes(paramTypes = {Type.DATE, Type.DATE_ARRAY})}
+    paramTypes = {@ParamTypes(paramTypes = {Type.DATE_ARRAY})}
 )
 public class CoalesceDate extends Coalesce {
   public CoalesceDate() {
     super(new Column[] {
-        new Column("column", TajoDataTypes.Type.DATE),
         new Column("params", TajoDataTypes.Type.DATE_ARRAY),
     });
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java
index 3e94150..47c363f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceDouble.java
@@ -31,12 +31,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
     example = "> SELECT coalesce(null, null, 10.0);\n"
         + "10.0",
     returnType = TajoDataTypes.Type.FLOAT8,
-    paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT8, TajoDataTypes.Type.FLOAT8_ARRAY})}
+    paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT8_ARRAY})}
 )
 public class CoalesceDouble extends Coalesce {
   public CoalesceDouble() {
     super(new Column[] {
-        new Column("column", TajoDataTypes.Type.FLOAT8),
         new Column("params", TajoDataTypes.Type.FLOAT8_ARRAY),
     });
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java
index 5d55255..f975615 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceLong.java
@@ -31,13 +31,12 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
     example = "> SELECT coalesce(null, null, 10);\n"
         + "10",
     returnType = TajoDataTypes.Type.INT8,
-    paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT8, TajoDataTypes.Type.INT8_ARRAY})}
+    paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT8_ARRAY})}
 )
 public class CoalesceLong extends Coalesce {
 
   public CoalesceLong() {
     super(new Column[] {
-        new Column("column", TajoDataTypes.Type.INT8),
         new Column("params", TajoDataTypes.Type.INT8_ARRAY),
     });
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java
index 50e4786..6441e00 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceString.java
@@ -31,13 +31,12 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
     example = "> SELECT coalesce(null, null, 'default');\n"
         + "default",
     returnType = TajoDataTypes.Type.TEXT,
-    paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.TEXT, TajoDataTypes.Type.TEXT_ARRAY})}
+    paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.TEXT_ARRAY})}
 )
 public class CoalesceString extends Coalesce {
 
   public CoalesceString() {
     super(new Column[] {
-        new Column("column", TajoDataTypes.Type.TEXT),
         new Column("params", TajoDataTypes.Type.TEXT_ARRAY),
     });
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java
index 01bb6de..56cfe32 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTime.java
@@ -32,12 +32,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
     example = "> SELECT coalesce(null, null, time '12:10:00');\n"
         + "12:10:00",
     returnType = Type.TIME,
-    paramTypes = {@ParamTypes(paramTypes = {Type.TIME, Type.TIME_ARRAY})}
+    paramTypes = {@ParamTypes(paramTypes = {Type.TIME_ARRAY})}
 )
 public class CoalesceTime extends Coalesce {
   public CoalesceTime() {
     super(new Column[] {
-        new Column("column", TajoDataTypes.Type.TIME),
         new Column("params", TajoDataTypes.Type.TIME_ARRAY),
     });
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java
index 2609717..ec02e46 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/builtin/CoalesceTimestamp.java
@@ -32,12 +32,11 @@ import org.apache.tajo.engine.function.annotation.ParamTypes;
     example = "> SELECT coalesce(null, null, timestamp '2014-01-01');\n"
         + "2014-01-01 00:00:00",
     returnType = Type.TIMESTAMP,
-    paramTypes = {@ParamTypes(paramTypes = {Type.TIMESTAMP, Type.TIMESTAMP_ARRAY})}
+    paramTypes = {@ParamTypes(paramTypes = {Type.TIMESTAMP_ARRAY})}
 )
 public class CoalesceTimestamp extends Coalesce {
   public CoalesceTimestamp() {
     super(new Column[] {
-        new Column("column", TajoDataTypes.Type.TIMESTAMP),
         new Column("params", TajoDataTypes.Type.TIMESTAMP_ARRAY),
     });
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
index 6a3af98..574e32c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/ExprAnnotator.java
@@ -439,11 +439,18 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
   @Override
   public EvalNode visitConcatenate(Context ctx, Stack<Expr> stack, BinaryOperator expr) throws PlanningException {
     stack.push(expr);
-    EvalNode left = visit(ctx, stack, expr.getLeft());
-    EvalNode right = visit(ctx, stack, expr.getRight());
+    EvalNode lhs = visit(ctx, stack, expr.getLeft());
+    EvalNode rhs = visit(ctx, stack, expr.getRight());
     stack.pop();
 
-    return new BinaryEval(EvalType.CONCATENATE, left, right);
+    if (lhs.getValueType().getType() != Type.TEXT) {
+      lhs = convertType(lhs, CatalogUtil.newSimpleDataType(Type.TEXT));
+    }
+    if (rhs.getValueType().getType() != Type.TEXT) {
+      rhs = convertType(rhs, CatalogUtil.newSimpleDataType(Type.TEXT));
+    }
+
+    return new BinaryEval(EvalType.CONCATENATE, lhs, rhs);
   }
 
   private EvalNode visitPatternMatchPredicate(Context ctx, Stack<Expr> stack, PatternMatchPredicate expr)
@@ -599,7 +606,7 @@ public class ExprAnnotator extends BaseAlgebraVisitor<ExprAnnotator.Context, Eva
 
     // trying the implicit type conversion between actual parameter types and the definition types.
     if (CatalogUtil.checkIfVariableLengthParamDefinition(TUtil.newList(funcDesc.getParamTypes()))) {
-      DataType lastDataType = null;
+      DataType lastDataType = funcDesc.getParamTypes()[0];
       for (int i = 0; i < givenArgs.length; i++) {
         if (i < (funcDesc.getParamTypes().length - 1)) { // variable length
           lastDataType = funcDesc.getParamTypes()[i];

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index e34548c..2730202 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -55,6 +55,7 @@ import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.List;
 import java.util.Stack;
 
@@ -147,6 +148,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         stack.push(selNode);
         leftExec = createPlanRecursive(ctx, selNode.getChild(), stack);
         stack.pop();
+
         return new SelectionExec(ctx, selNode, leftExec);
 
       case PROJECTION:
@@ -154,6 +156,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         stack.push(prjNode);
         leftExec = createPlanRecursive(ctx, prjNode.getChild(), stack);
         stack.pop();
+
         return new ProjectionExec(ctx, prjNode, leftExec);
 
       case TABLE_SUBQUERY: {
@@ -210,6 +213,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
         leftExec = createPlanRecursive(ctx, joinNode.getLeftChild(), stack);
         rightExec = createPlanRecursive(ctx, joinNode.getRightChild(), stack);
         stack.pop();
+
         return createJoinPlan(ctx, joinNode, leftExec, rightExec);
 
       case UNION:

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
index 161d39b..d8499d0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -18,34 +18,49 @@
 
 package org.apache.tajo.engine.planner;
 
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
 
 public class Projector {
+  private final TaskAttemptContext context;
   private final Schema inSchema;
+  private final Target [] targets;
 
   // for projection
   private final int targetNum;
   private final EvalNode[] evals;
 
-  public Projector(Schema inSchema, Schema outSchema, Target [] targets) {
+  public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets) {
+    this.context = context;
     this.inSchema = inSchema;
     if (targets == null) {
-      targets = PlannerUtil.schemaToTargets(outSchema);
+      this.targets = PlannerUtil.schemaToTargets(outSchema);
+    } else {
+      this.targets = targets;
     }
-    this.targetNum = targets.length;
+
+    this.targetNum = this.targets.length;
     evals = new EvalNode[targetNum];
-    for (int i = 0; i < targetNum; i++) {
-      evals[i] = targets[i].getEvalTree();
+
+    if (context.getQueryContext().getBool(SessionVars.CODEGEN)) {
+      EvalNode eval;
+      for (int i = 0; i < targetNum; i++) {
+        eval = this.targets[i].getEvalTree();
+        evals[i] = context.getPrecompiledEval(inSchema, eval);
+      }
+    } else {
+      for (int i = 0; i < targetNum; i++) {
+        evals[i] = this.targets[i].getEvalTree();
+      }
     }
   }
 
   public void eval(Tuple in, Tuple out) {
-    if (targetNum > 0) {
-      for (int i = 0; i < evals.length; i++) {
-        out.put(i, evals[i].eval(inSchema, in));
-      }
+    for (int i = 0; i < evals.length; i++) {
+      out.put(i, evals[i].eval(inSchema, in));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java
index 6c45868..aa6d597 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/HavingNode.java
@@ -22,21 +22,26 @@ import com.google.gson.annotations.Expose;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlanString;
 
-public class HavingNode extends UnaryNode implements Cloneable {
+public class HavingNode extends UnaryNode implements SelectableNode, Cloneable {
 	@Expose private EvalNode qual;
 
   public HavingNode(int pid) {
     super(pid, NodeType.HAVING);
   }
 
-	public EvalNode getQual() {
-		return this.qual;
-	}
+  @Override
+  public boolean hasQual() {
+    return true;
+  }
 
-	public void setQual(EvalNode qual) {
+  public void setQual(EvalNode qual) {
 		this.qual = qual;
   }
 
+  public EvalNode getQual() {
+    return this.qual;
+  }
+
   @Override
   public boolean equals(Object obj) {
     if (obj instanceof HavingNode) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
index 9582dee..8d28e6e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/ScanNode.java
@@ -30,7 +30,7 @@ import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.util.TUtil;
 
-public class ScanNode extends RelationNode implements Projectable, Cloneable {
+public class ScanNode extends RelationNode implements Projectable, SelectableNode, Cloneable {
 	@Expose protected TableDesc tableDesc;
   @Expose protected String alias;
   @Expose protected Schema logicalSchema;
@@ -101,6 +101,7 @@ public class ScanNode extends RelationNode implements Projectable, Cloneable {
     }
   }
 
+  @Override
   public Schema getTableSchema() {
     return logicalSchema;
   }
@@ -108,15 +109,18 @@ public class ScanNode extends RelationNode implements Projectable, Cloneable {
   public Schema getPhysicalSchema() {
     return getInSchema();
   }
-	
+
+  @Override
 	public boolean hasQual() {
 	  return qual != null;
 	}
-	
+
+  @Override
 	public EvalNode getQual() {
 	  return this.qual;
 	}
-	
+
+  @Override
 	public void setQual(EvalNode evalTree) {
 	  this.qual = evalTree;
 	}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java
new file mode 100644
index 0000000..7082f4b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectableNode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.logical;
+
+import org.apache.tajo.engine.eval.EvalNode;
+
+/**
+ * An interface for logical node which is able to filter tuples.
+ */
+public interface SelectableNode {
+
+  /**
+   * Checking if it has filter condition
+   *
+   * @return True if it has filter condition. Otherwise, it will return false.
+   */
+  public boolean hasQual();
+
+  /**
+   * Set a filter condition.
+   *
+   * @param eval EvalNode resulting in a boolean result.
+   */
+  public void setQual(EvalNode eval);
+
+  /**
+   * Get a filter condition
+   *
+   * @return Filter Condition
+   */
+  public EvalNode getQual();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
index b8a9680..3bbbd82 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/logical/SelectionNode.java
@@ -22,21 +22,26 @@ import com.google.gson.annotations.Expose;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlanString;
 
-public class SelectionNode extends UnaryNode implements Cloneable {
+public class SelectionNode extends UnaryNode implements SelectableNode, Cloneable {
 	@Expose private EvalNode qual;
 
   public SelectionNode(int pid) {
     super(pid, NodeType.SELECTION);
   }
 
-	public EvalNode getQual() {
-		return this.qual;
-	}
+  @Override
+  public boolean hasQual() {
+    return true;
+  }
 
-	public void setQual(EvalNode qual) {
+  public void setQual(EvalNode qual) {
 		this.qual = qual;
 	}
 
+  public EvalNode getQual() {
+    return this.qual;
+  }
+
   @Override
   public PlanString getPlanString() {
     PlanString planStr = new PlanString(this);

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
index 60a7c19..91cefa1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BNLJoinExec.java
@@ -79,13 +79,20 @@ public class BNLJoinExec extends BinaryPhysicalExec {
       plan.setTargets(PlannerUtil.schemaToTargets(outSchema));
     }
 
-    projector = new Projector(inSchema, outSchema, plan.getTargets());
+    projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
     // for join
     frameTuple = new FrameTuple();
     outputTuple = new VTuple(outSchema.size());
   }
 
+  @Override
+  protected void compile() {
+    if (hasJoinQual) {
+      joinQual = context.getPrecompiledEval(inSchema, joinQual);
+    }
+  }
+
   public JoinNode getPlan() {
     return plan;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index 35de707..f831525 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -59,7 +59,7 @@ public class BSTIndexScanExec extends PhysicalExec {
     this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
         scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
     this.fileScanner.init();
-    this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
+    this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());
 
     this.reader = new BSTIndex(sm.getFileSystem().getConf()).
         getIndexReader(fileName, keySchema, comparator);

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
index f6f3e52..42611b0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BasicPhysicalExecutorVisitor.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.engine.planner.PhysicalPlanningException;
+
 import java.util.Stack;
 
 public class BasicPhysicalExecutorVisitor<CONTEXT, RESULT> implements PhysicalExecutorVisitor<CONTEXT, RESULT> {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
index 628c18c..03ec396 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BinaryPhysicalExec.java
@@ -52,6 +52,8 @@ public abstract class BinaryPhysicalExec extends PhysicalExec {
     leftChild.init();
     rightChild.init();
     progress = 0.0f;
+
+    super.init();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 6215527..700e34d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -34,6 +34,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.PhysicalPlanningException;
 import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.Scanner;

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
index 65ebe2f..9dabbb3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashFullOuterJoinExec.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.Column;
+import org.apache.tajo.engine.codegen.CompilationError;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.Projector;
@@ -91,7 +92,7 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
     }
 
     // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
     // for join
     frameTuple = new FrameTuple();
@@ -102,6 +103,11 @@ public class HashFullOuterJoinExec extends BinaryPhysicalExec {
     rightNumCols = inner.getSchema().size();
   }
 
+  @Override
+  protected void compile() throws CompilationError {
+    joinQual = context.getPrecompiledEval(inSchema, joinQual);
+  }
+
   protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
     for (int i = 0; i < leftKeyList.length; i++) {
       keyTuple.put(i, outerTuple.get(leftKeyList[i]));

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
index a5e9df0..426a7a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java
@@ -82,7 +82,7 @@ public class HashJoinExec extends BinaryPhysicalExec {
     }
 
     // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
     // for join
     frameTuple = new FrameTuple();
@@ -90,6 +90,11 @@ public class HashJoinExec extends BinaryPhysicalExec {
     leftKeyTuple = new VTuple(leftKeyList.length);
   }
 
+  @Override
+  protected void compile() {
+    joinQual = context.getPrecompiledEval(inSchema, joinQual);
+  }
+
   protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
     for (int i = 0; i < leftKeyList.length; i++) {
       keyTuple.put(i, outerTuple.get(leftKeyList[i]));

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index ac8b28f..b752db5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -109,7 +109,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
     }
 
     // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
     // for join
     frameTuple = new FrameTuple();
@@ -119,6 +119,11 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
     rightNumCols = rightChild.getSchema().size();
   }
 
+  @Override
+  protected void compile() {
+    joinQual = context.getPrecompiledEval(inSchema, joinQual);
+  }
+
   protected void getKeyLeftTuple(final Tuple outerTuple, Tuple keyTuple) {
     for (int i = 0; i < leftKeyList.length; i++) {
       keyTuple.put(i, outerTuple.get(leftKeyList[i]));

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
index 4fbb5e4..4fdd03a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftSemiJoinExec.java
@@ -45,6 +45,10 @@ public class HashLeftSemiJoinExec extends HashJoinExec {
     }
   }
 
+  protected void compile() {
+    joinQual = context.getPrecompiledEval(inSchema, joinQual);
+  }
+
   /**
    * The End of Tuple (EOT) condition is true only when no more tuple in the left relation (on disk).
    * next() method finds the first unmatched tuple from both tables.

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
index ff1f7b3..e1cc6a8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeFullOuterJoinExec.java
@@ -88,7 +88,7 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
         plan.getJoinQual(), leftChild.getSchema(), rightChild.getSchema());
 
     // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
     // for join
     frameTuple = new FrameTuple();
@@ -98,6 +98,11 @@ public class MergeFullOuterJoinExec extends BinaryPhysicalExec {
     rightNumCols = rightChild.getSchema().size();
   }
 
+  @Override
+  protected void compile() {
+    joinQual = context.getPrecompiledEval(inSchema, joinQual);
+  }
+
   public JoinNode getPlan(){
     return this.joinNode;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
index 470e1c9..bbfe973 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/MergeJoinExec.java
@@ -84,13 +84,18 @@ public class MergeJoinExec extends BinaryPhysicalExec {
     this.innerIterator = innerTupleSlots.iterator();
     
     // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
     // for join
     frameTuple = new FrameTuple();
     outTuple = new VTuple(outSchema.size());
   }
 
+  @Override
+  protected void compile() {
+    joinQual = context.getPrecompiledEval(inSchema, joinQual);
+  }
+
   public JoinNode getPlan(){
     return this.joinNode;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
index 6e5900e..dc061ed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLJoinExec.java
@@ -54,7 +54,7 @@ public class NLJoinExec extends BinaryPhysicalExec {
     }
 
     // for projection
-    projector = new Projector(inSchema, outSchema, plan.getTargets());
+    projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
     // for join
     needNewOuter = true;

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
index 5c17c40..37ef7df 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/NLLeftOuterJoinExec.java
@@ -57,7 +57,7 @@ public class NLLeftOuterJoinExec extends BinaryPhysicalExec {
     }
 
     // for projection
-    projector = new Projector(inSchema, outSchema, plan.getTargets());
+    projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
     // for join
     needNextRightTuple = true;

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index 9fa5b76..d87a30d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -62,8 +62,9 @@ public class PartitionMergeScanExec extends PhysicalExec {
 
   public void init() throws IOException {
     for (CatalogProtos.FragmentProto fragment : fragments) {
-      scanners.add(new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan),
-          new CatalogProtos.FragmentProto[] {fragment}));
+      SeqScanExec scanExec = new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan),
+          new CatalogProtos.FragmentProto[] {fragment});
+      scanners.add(scanExec);
     }
     progress = 0.0f;
     rescan();

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index e30a10b..31cfc4d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -20,9 +20,11 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaObject;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.engine.codegen.CompilationError;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -47,7 +49,14 @@ public abstract class PhysicalExec implements SchemaObject {
     return outSchema;
   }
 
-  public abstract void init() throws IOException;
+  public void init() throws IOException {
+    if (context.getQueryContext().getBool(SessionVars.CODEGEN)) {
+      this.compile();
+    }
+  }
+
+  protected void compile() throws CompilationError {
+  }
 
   public abstract Tuple next() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
index 738db62..505b599 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExecutorVisitor.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.engine.planner.PhysicalPlanningException;
+
 import java.util.Stack;
 
 public interface PhysicalExecutorVisitor<CONTEXT, RESULT> {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
index 0909c76..2f55cf7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java
@@ -20,6 +20,7 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.engine.planner.PhysicalPlanningException;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
 import org.apache.tajo.engine.query.QueryContext;

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
deleted file mode 100644
index 62add1e..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanningException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.planner.physical;
-
-import java.io.IOException;
-
-public class PhysicalPlanningException extends IOException {
-  public PhysicalPlanningException(String message) {
-    super(message);
-  }
-
-  public PhysicalPlanningException(Exception ioe) {
-    super(ioe);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
index ee6ef1d..89cd75a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ProjectionExec.java
@@ -46,7 +46,7 @@ public class ProjectionExec extends UnaryPhysicalExec {
     super.init();
 
     this.outTuple = new VTuple(outSchema.size());
-    this.projector = new Projector(inSchema, outSchema, this.plan.getTargets());
+    this.projector = new Projector(context, inSchema, outSchema, this.plan.getTargets());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
index 365fc22..5d4dad5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RightOuterMergeJoinExec.java
@@ -87,7 +87,7 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
         plan.getJoinQual(), outer.getSchema(), inner.getSchema());
 
     // for projection
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
 
     // for join
     frameTuple = new FrameTuple();
@@ -96,6 +96,11 @@ public class RightOuterMergeJoinExec extends BinaryPhysicalExec {
     leftNumCols = outer.getSchema().size();
   }
 
+  @Override
+  protected void compile() {
+    joinQual = context.getPrecompiledEval(inSchema, joinQual);
+  }
+
   public JoinNode getPlan() {
     return this.joinNode;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
index 2e676e9..5ae9a8f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SelectionExec.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.tajo.engine.codegen.CompilationError;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.logical.SelectionNode;
 import org.apache.tajo.storage.Tuple;
@@ -26,7 +27,7 @@ import org.apache.tajo.worker.TaskAttemptContext;
 import java.io.IOException;
 
 public class SelectionExec extends UnaryPhysicalExec  {
-  private final EvalNode qual;
+  private EvalNode qual;
 
   public SelectionExec(TaskAttemptContext context,
                        SelectionNode plan,
@@ -36,6 +37,11 @@ public class SelectionExec extends UnaryPhysicalExec  {
   }
 
   @Override
+  public void compile() throws CompilationError {
+    qual = context.getPrecompiledEval(inSchema, qual);
+  }
+
+  @Override
   public Tuple next() throws IOException {
     Tuple tuple;
     while ((tuple = child.next()) != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 507cb6c..2f0c12f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -26,6 +26,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.codegen.CompilationError;
 import org.apache.tajo.engine.eval.ConstEval;
 import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.eval.EvalTreeUtil;
@@ -66,8 +67,8 @@ public class SeqScanExec extends PhysicalExec {
 
   private boolean cacheRead = false;
 
-  public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm,
-                     ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException {
+  public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, ScanNode plan,
+                     CatalogProtos.FragmentProto [] fragments) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema());
 
     this.plan = plan;
@@ -87,6 +88,12 @@ public class SeqScanExec extends PhysicalExec {
       cacheKey = new TupleCacheKey(
           context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey);
     }
+
+    if (fragments != null
+        && plan.getTableDesc().hasPartition()
+        && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
+      rewriteColumnPartitionedTableSchema();
+    }
   }
 
   /**
@@ -120,12 +127,16 @@ public class SeqScanExec extends PhysicalExec {
       Datum datum = targetExpr.eval(columnPartitionSchema, partitionRow);
       ConstEval constExpr = new ConstEval(datum);
 
-      for (Target target : plan.getTargets()) {
+
+      for (int i = 0; i < plan.getTargets().length; i++) {
+        Target target = plan.getTargets()[i];
+
         if (target.getEvalTree().equals(targetExpr)) {
           if (!target.hasAlias()) {
             target.setAlias(target.getEvalTree().getName());
           }
           target.setExpr(constExpr);
+
         } else {
           EvalTreeUtil.replace(target.getEvalTree(), targetExpr, constExpr);
         }
@@ -140,12 +151,6 @@ public class SeqScanExec extends PhysicalExec {
   public void init() throws IOException {
     Schema projected;
 
-    if (fragments != null
-        && plan.getTableDesc().hasPartition()
-        && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
-      rewriteColumnPartitionedTableSchema();
-    }
-
     if (plan.hasTargets()) {
       projected = new Schema();
       Set<Column> columnSet = new HashSet<Column>();
@@ -193,10 +198,19 @@ public class SeqScanExec extends PhysicalExec {
     } else {
       initScanner(projected);
     }
+
+    super.init();
+  }
+
+  @Override
+  protected void compile() throws CompilationError {
+    if (plan.hasQual()) {
+      qual = context.getPrecompiledEval(inSchema, qual);
+    }
   }
 
   private void initScanner(Schema projected) throws IOException {
-    this.projector = new Projector(inSchema, outSchema, plan.getTargets());
+    this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
     if (fragments != null) {
       if (fragments.length > 1) {
         this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), plan.getTableDesc().getMeta(),

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
index ab67d7b..2c63ea6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/UnaryPhysicalExec.java
@@ -46,13 +46,17 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
     this.child = child;
   }
 
+  @Override
   public void init() throws IOException {
     progress = 0.0f;
     if (child != null) {
       child.init();
     }
+
+    super.init();
   }
 
+  @Override
   public void rescan() throws IOException {
     progress = 0.0f;
     if (child != null) {
@@ -60,6 +64,7 @@ public abstract class UnaryPhysicalExec extends PhysicalExec {
     }
   }
 
+  @Override
   public void close() throws IOException {
     progress = 1.0f;
     if (child != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
index 56df48d..ef82427 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
@@ -161,7 +161,7 @@ public class QueryUnitRequestImpl implements QueryUnitRequest {
 		this.serializedData = p.getSerializedData();
 		return this.serializedData;
 	}
-	
+
 	public boolean isInterQuery() {
 	  QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
     if (interQuery != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index fc1be9b..2c62d42 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -60,6 +60,7 @@ import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.session.Session;
 import org.apache.tajo.storage.*;
+import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -115,9 +116,20 @@ public class GlobalEngine extends AbstractService {
     super.stop();
   }
 
+  private QueryContext createQueryContext(Session session) {
+    QueryContext newQueryContext =  new QueryContext(context.getConf(), session);
+
+    String tajoTest = System.getProperty(CommonTestingUtil.TAJO_TEST_KEY);
+    if (tajoTest != null && tajoTest.equalsIgnoreCase(CommonTestingUtil.TAJO_TEST_TRUE)) {
+      newQueryContext.putAll(CommonTestingUtil.getSessionVarsForTest());
+    }
+
+    return newQueryContext;
+  }
+
   public SubmitQueryResponse executeQuery(Session session, String query, boolean isJson) {
     LOG.info("Query: " + query);
-    QueryContext queryContext = new QueryContext(context.getConf(), session);
+    QueryContext queryContext = createQueryContext(session);
     Expr planningContext;
 
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
new file mode 100644
index 0000000..9a4a01d
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LaunchTaskRunnersEvent.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.engine.query.QueryContext;
+
+import java.util.Collection;
+
+public class LaunchTaskRunnersEvent extends TaskRunnerGroupEvent {
+  private final QueryContext queryContext;
+  private final String planJson;
+
+  public LaunchTaskRunnersEvent(ExecutionBlockId executionBlockId,
+                                Collection<Container> containers, QueryContext queryContext, String planJson) {
+    super(EventType.CONTAINER_REMOTE_LAUNCH, executionBlockId, containers);
+    this.queryContext = queryContext;
+    this.planJson = planJson;
+  }
+
+  public QueryContext getQueryContext() {
+    return queryContext;
+  }
+
+  public String getPlanJson() {
+    return planJson;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index cb7861c..f3e4b72 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
@@ -42,10 +43,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class TajoContainerProxy extends ContainerProxy {
+  private final QueryContext queryContext;
+  private final String planJson;
+
   public TajoContainerProxy(QueryMasterTask.QueryMasterTaskContext context,
                             Configuration conf, Container container,
-                            ExecutionBlockId executionBlockId) {
+                            QueryContext queryContext, ExecutionBlockId executionBlockId, String planJson) {
     super(context, conf, executionBlockId, container);
+    this.queryContext = queryContext;
+    this.planJson = planJson;
   }
 
   @Override
@@ -101,6 +107,8 @@ public class TajoContainerProxy extends ContainerProxy {
               .setNodeId(container.getNodeId().toString())
               .setContainerId(container.getId().toString())
               .setQueryOutputPath(context.getStagingDir().toString())
+              .setQueryContext(queryContext.getProto())
+              .setPlanJson(planJson)
               .build();
 
       tajoWorkerRpcClient.executeExecutionBlock(null, request, NullCallback.get());

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 4e51460..8d4a6a3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -214,7 +214,7 @@ public class TajoMaster extends CompositeService {
   }
 
   private void initWebServer() throws Exception {
-    if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
+    if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
       InetSocketAddress address = systemConf.getSocketAddrVar(ConfVars.TAJO_MASTER_INFO_ADDRESS);
       webServer = StaticHttpServer.getInstance(this ,"admin", address.getHostName(), address.getPort(),
           true, null, context.getConf(), null);

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index deadd39..3a86802 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -190,7 +190,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
     builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds));
     TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build();
 
-    List<IntermediateEntryProto> intermediateEntries = new ArrayList<IntermediateEntryProto>();
     for (TajoMasterProtocol.WorkerResourceProto worker : workers) {
       try {
         if (worker.getPeerRpcPort() == 0) continue;

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 40c5406..87da175 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -42,6 +42,8 @@ import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.plan.proto.PlanProto;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
@@ -1034,8 +1036,9 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         }
         LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!");
         subQuery.eventHandler.handle(
-            new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
-                subQuery.getId(), allocationEvent.getAllocatedContainer())
+            new LaunchTaskRunnersEvent(subQuery.getId(), allocationEvent.getAllocatedContainer(),
+                subQuery.getContext().getQueryContext(),
+                CoreGsonHelper.toJson(subQuery.getBlock().getPlan(), LogicalNode.class))
         );
 
         subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START));

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
new file mode 100644
index 0000000..a70fbfd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.engine.codegen.ExecutorPreCompiler;
+import org.apache.tajo.engine.codegen.TajoClassLoader;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.util.Pair;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ExecutionBlockSharedResource {
+  private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class);
+  private AtomicBoolean initializing = new AtomicBoolean(false);
+  private volatile Boolean resourceInitSuccess = new Boolean(false);
+  private CountDownLatch initializedResourceLatch = new CountDownLatch(1);
+
+  // Query
+  private QueryContext context;
+
+  // Resources
+  private TajoClassLoader classLoader;
+  private ExecutorPreCompiler.CompilationContext compilationContext;
+  private LogicalNode plan;
+  private boolean codeGenEnabled = false;
+
+  public void initialize(final QueryContext context, final String planJson) throws InterruptedException {
+
+    if (!initializing.getAndSet(true)) {
+      Thread thread = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            ExecutionBlockSharedResource.this.context = context;
+            initPlan(planJson);
+            initCodeGeneration();
+            resourceInitSuccess = true;
+          } catch (Throwable t) {
+            LOG.error(t);
+            LOG.error(ExceptionUtils.getStackTrace(t));
+          } finally {
+            initializedResourceLatch.countDown();
+          }
+        }
+      });
+      thread.run();
+      thread.join();
+
+      if (!resourceInitSuccess) {
+        throw new RuntimeException("Resource cannot be initialized");
+      }
+    }
+  }
+
+  private void initPlan(String planJson) {
+    plan = CoreGsonHelper.fromJson(planJson, LogicalNode.class);
+  }
+
+  private void initCodeGeneration() throws PlanningException {
+    if (context.getBool(SessionVars.CODEGEN)) {
+      codeGenEnabled = true;
+      classLoader = new TajoClassLoader();
+      compilationContext = new ExecutorPreCompiler.CompilationContext(classLoader);
+      ExecutorPreCompiler.compile(compilationContext, plan);
+    }
+  }
+
+  public boolean awaitInitializedResource() throws InterruptedException {
+    initializedResourceLatch.await();
+    return resourceInitSuccess;
+  }
+
+  public LogicalNode getPlan() {
+    return this.plan;
+  }
+
+  public EvalNode compileEval(Schema schema, EvalNode eval) {
+    return compilationContext.getCompiler().compile(schema, eval);
+  }
+
+  public EvalNode getPreCompiledEval(Schema schema, EvalNode eval) {
+    if (codeGenEnabled) {
+
+      Pair<Schema, EvalNode> key = new Pair<Schema, EvalNode>(schema, eval);
+      if (compilationContext.getPrecompiedEvals().containsKey(key)) {
+        return compilationContext.getPrecompiedEvals().get(key);
+      } else {
+        try {
+          LOG.warn(eval.toString() + " does not exists. Immediately compile it: " + eval);
+          return compileEval(schema, eval);
+        } catch (Throwable t) {
+          LOG.warn(t);
+          return eval;
+        }
+      }
+    } else {
+      throw new IllegalStateException("CodeGen is disabled");
+    }
+  }
+
+  public void release() {
+    compilationContext = null;
+
+    if (classLoader != null) {
+      try {
+        classLoader.clean();
+      } catch (Throwable throwable) {
+        throwable.printStackTrace();
+      }
+      classLoader = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index c7e513d..aaff69c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -31,10 +31,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.ContainerProxy;
-import org.apache.tajo.master.TajoContainerProxy;
-import org.apache.tajo.master.TaskRunnerGroupEvent;
-import org.apache.tajo.master.TaskRunnerLauncher;
+import org.apache.tajo.master.*;
 import org.apache.tajo.master.event.ContainerAllocationEvent;
 import org.apache.tajo.master.event.ContainerAllocatorEventType;
 import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
@@ -143,19 +140,20 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     @Override
     public void handle(TaskRunnerGroupEvent event) {
       if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_LAUNCH) {
-        launchTaskRunners(event.getExecutionBlockId(), event.getContainers());
+        LaunchTaskRunnersEvent launchEvent = (LaunchTaskRunnersEvent) event;
+        launchTaskRunners(launchEvent);
       } else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) {
         stopContainers(event.getContainers());
       }
     }
   }
 
-  private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
+  private void launchTaskRunners(LaunchTaskRunnersEvent event) {
     // Query in standby mode doesn't need launch Worker.
     // But, Assign ExecutionBlock to assigned tajo worker
-    for(Container eachContainer: containers) {
+    for(Container eachContainer: event.getContainers()) {
       TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf,
-          eachContainer, executionBlockId);
+          eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson());
       executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index d8d09e1..f76176d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -28,12 +28,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.shell.PathData;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.CatalogClient;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.ha.TajoMasterInfo;
 import org.apache.tajo.master.querymaster.QueryMaster;
@@ -55,6 +57,7 @@ import java.lang.management.ThreadMXBean;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -207,7 +210,7 @@ public class TajoWorker extends CompositeService {
         addService(pullService);
       }
 
-      if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
+      if (!systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
         try {
           httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
           if(queryMasterMode && !taskRunnerMode) {
@@ -349,6 +352,9 @@ public class TajoWorker extends CompositeService {
   }
 
   public class WorkerContext {
+    private ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource> sharedResourceMap =
+        new ConcurrentHashMap<ExecutionBlockId, ExecutionBlockSharedResource>();
+
     public QueryMaster getQueryMaster() {
       if(queryMasterManagerService == null) {
         return null;
@@ -356,6 +362,10 @@ public class TajoWorker extends CompositeService {
       return queryMasterManagerService.getQueryMaster();
     }
 
+    public TajoConf getConf() {
+      return systemConf;
+    }
+
     public TajoWorkerManagerService getTajoWorkerManagerService() {
       return tajoWorkerManagerService;
     }
@@ -398,6 +408,25 @@ public class TajoWorker extends CompositeService {
       }
     }
 
+    public void initSharedResource(QueryContext queryContext, ExecutionBlockId blockId, String planJson)
+        throws InterruptedException {
+
+      if (!sharedResourceMap.containsKey(blockId)) {
+        ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource();
+        if (sharedResourceMap.putIfAbsent(blockId, resource) == null) {
+          resource.initialize(queryContext, planJson);
+        }
+      }
+    }
+
+    public ExecutionBlockSharedResource getSharedResource(ExecutionBlockId blockId) {
+      return sharedResourceMap.get(blockId);
+    }
+
+    public void releaseSharedResource(ExecutionBlockId blockId) {
+      sharedResourceMap.remove(blockId).release();
+    }
+
     protected void cleanup(String strPath) {
       if(deletionService == null) return;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index c5f1446..fa116c3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -25,16 +25,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.*;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.ExecutionBlockReport;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.TajoIdUtils;
 
 import java.net.InetSocketAddress;
 
@@ -118,7 +116,12 @@ public class TajoWorkerManagerService extends CompositeService
                                     TajoWorkerProtocol.RunExecutionBlockRequestProto request,
                                     RpcCallback<PrimitiveProtos.BoolProto> done) {
     workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
+
     try {
+      workerContext.initSharedResource(
+          new QueryContext(workerContext.getConf(), request.getQueryContext()),
+          TajoIdUtils.createExecutionBlockId(request.getExecutionBlockId()), request.getPlanJson());
+
       String[] params = new String[7];
       params[0] = "standby";  //mode(never used)
       params[1] = request.getExecutionBlockId();
@@ -132,8 +135,8 @@ public class TajoWorkerManagerService extends CompositeService
       params[6] = request.getQueryOutputPath();
       workerContext.getTaskRunnerManager().startTask(params);
       done.run(TajoWorker.TRUE_PROTO);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
       done.run(TajoWorker.FALSE_PROTO);
     }
   }
@@ -163,6 +166,9 @@ public class TajoWorkerManagerService extends CompositeService
       workerContext.cleanup(inputDir);
       String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
       workerContext.cleanup(outputDir);
+
+      // Release shared resources
+      workerContext.releaseSharedResource(new ExecutionBlockId(executionBlockIdProto));
     }
     done.run(TajoWorker.TRUE_PROTO);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 1881685..d0665ae 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -157,6 +157,20 @@ public class Task {
     this.reporter = new Reporter(taskId, masterProxy);
     this.reporter.startCommunicationThread();
 
+
+    // resource intiailization
+    boolean resourceInitialized = false;
+    try {
+      resourceInitialized = context.getSharedResource().awaitInitializedResource();
+    } catch (InterruptedException e) {
+      LOG.error("Failed Resource Initialization", e);
+    } finally {
+      if (!resourceInitialized) {
+        setState(TaskAttemptState.TA_FAILED);
+        return;
+      }
+    }
+
     plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
     LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
     if (scanNode != null) {
@@ -218,25 +232,27 @@ public class Task {
   }
 
   public void init() throws IOException {
-    // initialize a task temporal dir
-    localFS.mkdirs(taskDir);
-
-    if (request.getFetches().size() > 0) {
-      inputTableBaseDir = localFS.makeQualified(
-          lDirAllocator.getLocalPathForWrite(
-              getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
-      localFS.mkdirs(inputTableBaseDir);
-      Path tableDir;
-      for (String inputTable : context.getInputTables()) {
-        tableDir = new Path(inputTableBaseDir, inputTable);
-        if (!localFS.exists(tableDir)) {
-          LOG.info("the directory is created  " + tableDir.toUri());
-          localFS.mkdirs(tableDir);
+    if (context.getState() == TaskAttemptState.TA_PENDING) {
+      // initialize a task temporal dir
+      localFS.mkdirs(taskDir);
+
+      if (request.getFetches().size() > 0) {
+        inputTableBaseDir = localFS.makeQualified(
+            lDirAllocator.getLocalPathForWrite(
+                getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
+        localFS.mkdirs(inputTableBaseDir);
+        Path tableDir;
+        for (String inputTable : context.getInputTables()) {
+          tableDir = new Path(inputTableBaseDir, inputTable);
+          if (!localFS.exists(tableDir)) {
+            LOG.info("the directory is created  " + tableDir.toUri());
+            localFS.mkdirs(tableDir);
+          }
         }
       }
+      // for localizing the intermediate data
+      localize(request);
     }
-    // for localizing the intermediate data
-    localize(request);
   }
 
   public QueryUnitAttemptId getTaskId() {
@@ -426,14 +442,21 @@ public class Task {
 
       while(!killed && executor.next() != null) {
       }
-      this.executor.close();
-      reloadInputStats();
-      this.executor = null;
     } catch (Exception e) {
       error = e ;
       LOG.error(e.getMessage(), e);
       aborted = true;
     } finally {
+      if (executor != null) {
+        try {
+          executor.close();
+          reloadInputStats();
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+        this.executor = null;
+      }
+
       context.setProgress(1.0f);
       taskRunnerContext.completedTasksNum.incrementAndGet();
       context.getHashShuffleAppenderManager().finalizeTask(taskId);

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 6cb4bd7..d27fd6d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -26,8 +26,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.eval.EvalNode;
 import org.apache.tajo.engine.planner.enforce.Enforcer;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.query.QueryContext;
@@ -75,16 +77,23 @@ public class TaskAttemptContext {
   private Enforcer enforcer;
   private QueryContext queryContext;
   private WorkerContext workerContext;
+  private ExecutionBlockSharedResource sharedResource;
 
   /** a output volume for each partition */
   private Map<Integer, Long> partitionOutputVolume;
   private HashShuffleAppenderManager hashShuffleAppenderManager;
 
-  public TaskAttemptContext(QueryContext queryContext, final WorkerContext workerContext, final QueryUnitAttemptId queryId,
+  public TaskAttemptContext(QueryContext queryContext, final WorkerContext workerContext,
+                            final QueryUnitAttemptId queryId,
                             final FragmentProto[] fragments,
                             final Path workDir) {
     this.queryContext = queryContext;
-    this.workerContext = workerContext;
+
+    if (workerContext != null) { // For unit tests
+      this.workerContext = workerContext;
+      this.sharedResource = workerContext.getSharedResource(queryId.getQueryUnitId().getExecutionBlockId());
+    }
+
     this.queryId = queryId;
 
     if (fragments != null) {
@@ -154,6 +163,23 @@ public class TaskAttemptContext {
     return this.enforcer;
   }
 
+  public ExecutionBlockSharedResource getSharedResource() {
+    return sharedResource;
+  }
+
+  public EvalNode compileEval(Schema schema, EvalNode eval) {
+    return sharedResource.compileEval(schema, eval);
+  }
+
+  public EvalNode getPrecompiledEval(Schema schema, EvalNode eval) {
+    if (sharedResource != null) {
+      return sharedResource.getPreCompiledEval(schema, eval);
+    } else {
+      LOG.debug("Shared resource is not initialized. It is NORMAL in unit tests");
+      return eval;
+    }
+  }
+
   public boolean hasResultStats() {
     return resultStats != null;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index cdb1438..e100c48 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -207,6 +207,9 @@ message RunExecutionBlockRequestProto {
     required string nodeId = 4;
     required string containerId = 5;
     optional string queryOutputPath = 6;
+
+    required KeyValueSetProto queryContext = 7;
+    required string planJson = 8;
 }
 
 message ExecutionBlockListProto {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 271ba70..4d9ca67 100644
--- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -33,6 +33,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.session.Session;
+import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TajoIdUtils;
 
@@ -66,12 +67,15 @@ public class LocalTajoTestingUtility {
   public static QueryUnitAttemptId newQueryUnitAttemptId(MasterPlan plan) {
     return QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(plan.newExecutionBlockId()), 0);
   }
+
   public static Session createDummySession() {
     return new Session(UUID.randomUUID().toString(), dummyUserInfo.getUserName(), TajoConstants.DEFAULT_DATABASE_NAME);
   }
 
   public static QueryContext createDummyContext(TajoConf conf) {
-    return new QueryContext(conf, createDummySession());
+    QueryContext context = new QueryContext(conf, createDummySession());
+    context.putAll(CommonTestingUtil.getSessionVarsForTest().getAllKeyValus());
+    return context;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/7603a3d4/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 7b87112..346fa69 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -92,9 +92,16 @@ public class TajoTestingCluster {
   public TajoTestingCluster(boolean masterHaEMode) {
     this.conf = new TajoConf();
     this.conf.setBoolVar(ConfVars.TAJO_MASTER_HA_ENABLE, masterHaEMode);
+
+    setTestingFlagProperties();
     initPropertiesAndConfigs();
   }
 
+  void setTestingFlagProperties() {
+    System.setProperty(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+    conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE);
+  }
+
   void initPropertiesAndConfigs() {
     if (System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname) != null) {
       String testResourceManager = System.getProperty(ConfVars.RESOURCE_MANAGER_CLASS.varname);
@@ -106,7 +113,6 @@ public class TajoTestingCluster {
 
     this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS)
         .indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
-    conf.set(CommonTestingUtil.TAJO_TEST, "TRUE");
   }
 
 	public TajoConf getConfiguration() {