You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/27 04:01:06 UTC

[5/6] tajo git commit: TAJO-1350: Refactor FilterPushDownRule::visitJoin() into well-defined, small methods. (jihoon)

TAJO-1350: Refactor FilterPushDownRule::visitJoin() into well-defined, small methods. (jihoon)

Closes  #384


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b1e174ee
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b1e174ee
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b1e174ee

Branch: refs/heads/index_support
Commit: b1e174eec4c15142b6fa518b6803579bb0788e8e
Parents: cad5442
Author: Jihoon Son <ji...@apache.org>
Authored: Fri Mar 27 11:49:42 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri Mar 27 11:49:42 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../planner/physical/HashLeftOuterJoinExec.java |   3 +-
 .../apache/tajo/engine/query/TestJoinQuery.java |  23 +-
 .../testComplexJoinsWithCaseWhen.sql            |  11 +
 .../testComplexJoinsWithCaseWhen2.sql           |   9 +
 .../TestJoinQuery/testCrossJoinAndCaseWhen.sql  |  18 -
 .../TestJoinQuery/testInnerJoinAndCaseWhen.sql  |  18 +
 .../TestJoinQuery/testJoinWithOrPredicates.sql  |   6 +
 .../testComplexJoinsWithCaseWhen.result         |  27 ++
 .../testComplexJoinsWithCaseWhen2.result        |  27 ++
 .../testCrossJoinAndCaseWhen.result             |  27 --
 .../testInnerJoinAndCaseWhen.result             |  27 ++
 .../testJoinWithOrPredicates.result             |   4 +
 .../org/apache/tajo/plan/LogicalPlanner.java    |  60 ++-
 .../plan/rewrite/rules/FilterPushDownRule.java  | 383 +++++++++----------
 .../rewrite/rules/ProjectionPushDownRule.java   |   5 +-
 .../org/apache/tajo/plan/util/PlannerUtil.java  |  25 +-
 17 files changed, 428 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 029735f..60cb0f7 100644
--- a/CHANGES
+++ b/CHANGES
@@ -9,6 +9,9 @@ Release 0.11.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1350: Refactor FilterPushDownRule::visitJoin() into well-defined, 
+    small methods. (jihoon)
+
     TAJO-1426: Support "explain global" to get physical plan. (Contributed by
     navis, Committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
index e78cb20..81ac02c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java
@@ -181,7 +181,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
       boolean satisfiedWithJoinCondition = joinQual.eval(inSchema, frameTuple).isTrue();
 
       // if a composited tuple satisfies with both join filter and join condition
-      if (satisfiedWithFilter && satisfiedWithJoinCondition) {
+      if (satisfiedWithJoinCondition && satisfiedWithFilter) {
         projector.eval(frameTuple, outTuple);
         return outTuple;
       } else {
@@ -195,6 +195,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec {
         // null padding
         Tuple nullPaddedTuple = TupleUtil.createNullPaddedTuple(rightNumCols);
         frameTuple.set(leftTuple, nullPaddedTuple);
+
         projector.eval(frameTuple, outTuple);
         return outTuple;
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
index 9ab32ff..1078943 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinQuery.java
@@ -338,7 +338,21 @@ public class TestJoinQuery extends QueryTestCaseBase {
   }
 
   @Test
-  public void testCrossJoinAndCaseWhen() throws Exception {
+  public void testInnerJoinAndCaseWhen() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testComplexJoinsWithCaseWhen() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
+
+  @Test
+  public void testComplexJoinsWithCaseWhen2() throws Exception {
     ResultSet res = executeQuery();
     assertResultSet(res);
     cleanupQuery(res);
@@ -1170,4 +1184,11 @@ public class TestJoinQuery extends QueryTestCaseBase {
     assertResultSet(res);
     cleanupQuery(res);
   }
+
+  @Test
+  public final void testJoinWithOrPredicates() throws Exception {
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinsWithCaseWhen.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinsWithCaseWhen.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinsWithCaseWhen.sql
new file mode 100644
index 0000000..b2c49a4
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinsWithCaseWhen.sql
@@ -0,0 +1,11 @@
+select
+  r_name,
+  case when
+    s_name is null then 'N/O'
+  else
+    s_name
+  end as s1
+from
+  region inner join nation on n_regionkey = r_regionkey
+  left outer join supplier on s_nationkey = n_nationkey
+order by r_name, s1;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinsWithCaseWhen2.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinsWithCaseWhen2.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinsWithCaseWhen2.sql
new file mode 100644
index 0000000..5e2918f
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testComplexJoinsWithCaseWhen2.sql
@@ -0,0 +1,9 @@
+select
+  r_name,
+  case when s_name is null then 'N/O'
+  else s_name end as s1
+from region inner join (
+  select * from nation
+  left outer join supplier on s_nationkey = n_nationkey
+) t on n_regionkey = r_regionkey
+order by r_name, s1;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinAndCaseWhen.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinAndCaseWhen.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinAndCaseWhen.sql
deleted file mode 100644
index d058aba..0000000
--- a/tajo-core/src/test/resources/queries/TestJoinQuery/testCrossJoinAndCaseWhen.sql
+++ /dev/null
@@ -1,18 +0,0 @@
-select
-  r_regionkey,
-  n_regionkey,
-  case
-    when r_regionkey = 1 then 'one'
-    when r_regionkey = 2 then 'two'
-    when r_regionkey = 3 then 'three'
-    when r_regionkey = 4 then 'four'
-    else 'zero'
-  end as cond
-from
-  region,
-  nation
-where
-  r_regionkey = n_regionkey
-order by
-  r_regionkey,
-  n_regionkey
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/resources/queries/TestJoinQuery/testInnerJoinAndCaseWhen.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testInnerJoinAndCaseWhen.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testInnerJoinAndCaseWhen.sql
new file mode 100644
index 0000000..d058aba
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testInnerJoinAndCaseWhen.sql
@@ -0,0 +1,18 @@
+select
+  r_regionkey,
+  n_regionkey,
+  case
+    when r_regionkey = 1 then 'one'
+    when r_regionkey = 2 then 'two'
+    when r_regionkey = 3 then 'three'
+    when r_regionkey = 4 then 'four'
+    else 'zero'
+  end as cond
+from
+  region,
+  nation
+where
+  r_regionkey = n_regionkey
+order by
+  r_regionkey,
+  n_regionkey
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithOrPredicates.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithOrPredicates.sql b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithOrPredicates.sql
new file mode 100644
index 0000000..b388233
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestJoinQuery/testJoinWithOrPredicates.sql
@@ -0,0 +1,6 @@
+select
+  n1.n_nationkey,
+  n1.n_name,
+  n2.n_name
+from nation n1, nation n2 where n1.n_name = n2.n_name and (n1.n_nationkey in (1, 2) or n2.n_nationkey in (2))
+order by n1.n_nationkey;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinsWithCaseWhen.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinsWithCaseWhen.result b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinsWithCaseWhen.result
new file mode 100644
index 0000000..046a7c1
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinsWithCaseWhen.result
@@ -0,0 +1,27 @@
+r_name,s1
+-------------------------------
+AFRICA,N/O
+AFRICA,N/O
+AFRICA,N/O
+AFRICA,Supplier#000000002
+AFRICA,Supplier#000000004
+AMERICA,N/O
+AMERICA,N/O
+AMERICA,N/O
+AMERICA,N/O
+AMERICA,Supplier#000000003
+ASIA,N/O
+ASIA,N/O
+ASIA,N/O
+ASIA,N/O
+ASIA,N/O
+EUROPE,N/O
+EUROPE,N/O
+EUROPE,N/O
+EUROPE,N/O
+EUROPE,N/O
+MIDDLE EAST,N/O
+MIDDLE EAST,N/O
+MIDDLE EAST,N/O
+MIDDLE EAST,N/O
+MIDDLE EAST,N/O
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinsWithCaseWhen2.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinsWithCaseWhen2.result b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinsWithCaseWhen2.result
new file mode 100644
index 0000000..046a7c1
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testComplexJoinsWithCaseWhen2.result
@@ -0,0 +1,27 @@
+r_name,s1
+-------------------------------
+AFRICA,N/O
+AFRICA,N/O
+AFRICA,N/O
+AFRICA,Supplier#000000002
+AFRICA,Supplier#000000004
+AMERICA,N/O
+AMERICA,N/O
+AMERICA,N/O
+AMERICA,N/O
+AMERICA,Supplier#000000003
+ASIA,N/O
+ASIA,N/O
+ASIA,N/O
+ASIA,N/O
+ASIA,N/O
+EUROPE,N/O
+EUROPE,N/O
+EUROPE,N/O
+EUROPE,N/O
+EUROPE,N/O
+MIDDLE EAST,N/O
+MIDDLE EAST,N/O
+MIDDLE EAST,N/O
+MIDDLE EAST,N/O
+MIDDLE EAST,N/O
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/resources/results/TestJoinQuery/testCrossJoinAndCaseWhen.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testCrossJoinAndCaseWhen.result b/tajo-core/src/test/resources/results/TestJoinQuery/testCrossJoinAndCaseWhen.result
deleted file mode 100644
index a4c33f8..0000000
--- a/tajo-core/src/test/resources/results/TestJoinQuery/testCrossJoinAndCaseWhen.result
+++ /dev/null
@@ -1,27 +0,0 @@
-r_regionkey,n_regionkey,cond
--------------------------------
-0,0,zero
-0,0,zero
-0,0,zero
-0,0,zero
-0,0,zero
-1,1,one
-1,1,one
-1,1,one
-1,1,one
-1,1,one
-2,2,two
-2,2,two
-2,2,two
-2,2,two
-2,2,two
-3,3,three
-3,3,three
-3,3,three
-3,3,three
-3,3,three
-4,4,four
-4,4,four
-4,4,four
-4,4,four
-4,4,four
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/resources/results/TestJoinQuery/testInnerJoinAndCaseWhen.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testInnerJoinAndCaseWhen.result b/tajo-core/src/test/resources/results/TestJoinQuery/testInnerJoinAndCaseWhen.result
new file mode 100644
index 0000000..a4c33f8
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testInnerJoinAndCaseWhen.result
@@ -0,0 +1,27 @@
+r_regionkey,n_regionkey,cond
+-------------------------------
+0,0,zero
+0,0,zero
+0,0,zero
+0,0,zero
+0,0,zero
+1,1,one
+1,1,one
+1,1,one
+1,1,one
+1,1,one
+2,2,two
+2,2,two
+2,2,two
+2,2,two
+2,2,two
+3,3,three
+3,3,three
+3,3,three
+3,3,three
+3,3,three
+4,4,four
+4,4,four
+4,4,four
+4,4,four
+4,4,four
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-core/src/test/resources/results/TestJoinQuery/testJoinWithOrPredicates.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestJoinQuery/testJoinWithOrPredicates.result b/tajo-core/src/test/resources/results/TestJoinQuery/testJoinWithOrPredicates.result
new file mode 100644
index 0000000..ede3464
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestJoinQuery/testJoinWithOrPredicates.result
@@ -0,0 +1,4 @@
+n_nationkey,n_name,n_name
+-------------------------------
+1,ARGENTINA,ARGENTINA
+2,BRAZIL,BRAZIL
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
index ff3d6c2..5e91b0c 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
@@ -1122,7 +1122,10 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       joinCondition = context.evalOptimizer.optimize(context, evalNode);
     }
 
-    List<String> newlyEvaluatedExprs = getNewlyEvaluatedExprsForJoin(context, joinNode, stack);
+    // If the query involves a subquery, the stack can be empty.
+    // In this case, this join is the top most one within a query block.
+    boolean isTopMostJoin = stack.isEmpty() ? true : stack.peek().getType() != OpType.Join;
+    List<String> newlyEvaluatedExprs = getNewlyEvaluatedExprsForJoin(context, joinNode, isTopMostJoin);
     List<Target> targets = TUtil.newList(PlannerUtil.schemaToTargets(merged));
 
     for (String newAddedExpr : newlyEvaluatedExprs) {
@@ -1141,7 +1144,7 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return joinNode;
   }
 
-  private List<String> getNewlyEvaluatedExprsForJoin(PlanContext context, JoinNode joinNode, Stack<Expr> stack) {
+  private List<String> getNewlyEvaluatedExprsForJoin(PlanContext context, JoinNode joinNode, boolean isTopMostJoin) {
     QueryBlock block = context.queryBlock;
 
     EvalNode evalNode;
@@ -1150,7 +1153,8 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
       NamedExpr namedExpr = it.next();
       try {
         evalNode = exprAnnotator.createEvalNode(context, namedExpr.getExpr(), NameResolvingMode.LEGACY);
-        if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, evalNode, joinNode, stack.peek().getType() != OpType.Join)) {
+        // the predicates specified in the on clause are already processed in visitJoin()
+        if (LogicalPlanner.checkIfBeEvaluatedAtJoin(context.queryBlock, evalNode, joinNode, isTopMostJoin)) {
           block.namedExprsMgr.markAsEvaluated(namedExpr.getAlias(), evalNode);
           newlyEvaluatedExprs.add(namedExpr.getAlias());
         }
@@ -1972,6 +1976,52 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return true;
   }
 
+  public static boolean isEvaluatableJoinQual(QueryBlock block, EvalNode evalNode, JoinNode node,
+                                              boolean isOnPredicate, boolean isTopMostJoin) {
+
+    if (checkIfBeEvaluatedAtJoin(block, evalNode, node, isTopMostJoin)) {
+
+      if (isNonEquiThetaJoinQual(block, node, evalNode)) {
+        return false;
+      }
+
+      if (PlannerUtil.isOuterJoin(node.getJoinType())) {
+        /*
+         * For outer joins, only predicates which are specified at the on clause can be evaluated during processing join.
+         * Other predicates from the where clause must be evaluated after the join.
+         * The below code will be modified after improving join operators to keep join filters by themselves (TAJO-1310).
+         */
+        if (!isOnPredicate) {
+          return false;
+        }
+      } else {
+        /*
+         * Only join predicates should be evaluated at join if the join type is inner or cross. (TAJO-1445)
+         */
+        if (!EvalTreeUtil.isJoinQual(block, node.getLeftChild().getOutSchema(), node.getRightChild().getOutSchema(),
+            evalNode, false)) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+
+    return false;
+  }
+
+  public static boolean isNonEquiThetaJoinQual(final LogicalPlan.QueryBlock block,
+                                               final JoinNode joinNode,
+                                               final EvalNode evalNode) {
+    if (EvalTreeUtil.isJoinQual(block, joinNode.getLeftChild().getOutSchema(),
+        joinNode.getRightChild().getOutSchema(), evalNode, true) &&
+        evalNode.getType() != EvalType.EQUAL) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   public static boolean checkIfBeEvaluatedAtJoin(QueryBlock block, EvalNode evalNode, JoinNode node,
                                                  boolean isTopMostJoin) {
     Set<Column> columnRefs = EvalTreeUtil.findUniqueColumns(evalNode);
@@ -2004,10 +2054,6 @@ public class LogicalPlanner extends BaseAlgebraVisitor<LogicalPlanner.PlanContex
     return true;
   }
 
-  public static boolean isOuterJoin(JoinType joinType) {
-    return joinType == JoinType.LEFT_OUTER || joinType == JoinType.RIGHT_OUTER || joinType==JoinType.FULL_OUTER;
-  }
-
   public static boolean containsOuterJoin(QueryBlock block) {
     return block.containsJoinType(JoinType.LEFT_OUTER) || block.containsJoinType(JoinType.RIGHT_OUTER) ||
         block.containsJoinType(JoinType.FULL_OUTER);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
index 4cd008a..a6a7c78 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilterPushDownRule.java
@@ -18,7 +18,9 @@
 
 package org.apache.tajo.plan.rewrite.rules;
 
-import com.google.common.collect.*;
+import com.google.common.collect.BiMap;
+import com.google.common.collect.HashBiMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.OverridableConf;
@@ -36,7 +38,11 @@ import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor;
 import org.apache.tajo.util.TUtil;
 
-import java.util.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 
 /**
  * This rule tries to push down all filter conditions into logical nodes as lower as possible.
@@ -48,7 +54,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   private static final String NAME = "FilterPushDown";
 
   static class FilterPushDownContext {
-    Set<EvalNode> pushingDownFilters = new HashSet<EvalNode>();
+    Set<EvalNode> pushingDownFilters = TUtil.newHashSet();
 
     public void clear() {
       pushingDownFilters.clear();
@@ -63,7 +69,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
 
     public void setToOrigin(Map<EvalNode, EvalNode> evalMap) {
       //evalMap: copy -> origin
-      List<EvalNode> origins = new ArrayList<EvalNode>();
+      List<EvalNode> origins = TUtil.newList();
       for (EvalNode eval : pushingDownFilters) {
         EvalNode origin = evalMap.get(eval);
         if (origin != null) {
@@ -118,7 +124,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   @Override
   public LogicalNode visitFilter(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                  SelectionNode selNode, Stack<LogicalNode> stack) throws PlanningException {
-    context.pushingDownFilters.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
+    context.pushingDownFilters.addAll(TUtil.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(selNode.getQual())));
 
     stack.push(selNode);
     visit(context, plan, block, selNode.getChild(), stack);
@@ -158,159 +164,24 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   public LogicalNode visitJoin(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                JoinNode joinNode,
                                Stack<LogicalNode> stack) throws PlanningException {
-    // here we should stop selection pushdown on the null supplying side(s) of an outer join
-    // get the two operands of the join operation as well as the join type
-    JoinType joinType = joinNode.getJoinType();
-    EvalNode joinQual = joinNode.getJoinQual();
-    if (joinQual != null && LogicalPlanner.isOuterJoin(joinType)) {
-      BinaryEval binaryEval = (BinaryEval) joinQual;
-      // if both are fields
-      if (binaryEval.getLeftExpr().getType() == EvalType.FIELD &&
-          binaryEval.getRightExpr().getType() == EvalType.FIELD) {
-
-        String leftTableName = ((FieldEval) binaryEval.getLeftExpr()).getQualifier();
-        String rightTableName = ((FieldEval) binaryEval.getRightExpr()).getQualifier();
-        List<String> nullSuppliers = Lists.newArrayList();
-        Set<String> leftTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
-            joinNode.getLeftChild()));
-        Set<String> rightTableSet = Sets.newHashSet(PlannerUtil.getRelationLineageWithinQueryBlock(plan,
-            joinNode.getRightChild()));
-
-        // some verification
-        if (joinType == JoinType.FULL_OUTER) {
-          nullSuppliers.add(leftTableName);
-          nullSuppliers.add(rightTableName);
-
-          // verify that these null suppliers are indeed in the left and right sets
-          if (!rightTableSet.contains(nullSuppliers.get(0)) && !leftTableSet.contains(nullSuppliers.get(0))) {
-            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
-          }
-          if (!rightTableSet.contains(nullSuppliers.get(1)) && !leftTableSet.contains(nullSuppliers.get(1))) {
-            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
-          }
-
-        } else if (joinType == JoinType.LEFT_OUTER) {
-          nullSuppliers.add(((RelationNode)joinNode.getRightChild()).getCanonicalName());
-          //verify that this null supplier is indeed in the right sub-tree
-          if (!rightTableSet.contains(nullSuppliers.get(0))) {
-            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
-          }
-        } else if (joinType == JoinType.RIGHT_OUTER) {
-          if (((RelationNode)joinNode.getRightChild()).getCanonicalName().equals(rightTableName)) {
-            nullSuppliers.add(leftTableName);
-          } else {
-            nullSuppliers.add(rightTableName);
-          }
-
-          // verify that this null supplier is indeed in the left sub-tree
-          if (!leftTableSet.contains(nullSuppliers.get(0))) {
-            throw new InvalidQueryException("Incorrect Logical Query Plan with regard to outer join");
-          }
-        }
-      }
-    }
-
-    /* non-equi filter should not be push down as a join qualifier until theta join is implemented
-     * TODO this code SHOULD be restored after TAJO-742 is resolved. */
-    List<EvalNode> thetaJoinFilter = new ArrayList<EvalNode>();
-    for (EvalNode eachEval: context.pushingDownFilters) {
-      if (eachEval.getType() != EvalType.EQUAL) {
-        if (EvalTreeUtil.isJoinQual(block,
-            joinNode.getLeftChild().getOutSchema(),
-            joinNode.getRightChild().getOutSchema(),
-            eachEval, true)) {
-          thetaJoinFilter.add(eachEval);
-        }
-      }
-    }
-    context.pushingDownFilters.removeAll(thetaJoinFilter);
-
-    // get evals from ON clause
-    List<EvalNode> onConditions = new ArrayList<EvalNode>();
+    Set<EvalNode> onPredicates = TUtil.newHashSet();
     if (joinNode.hasJoinQual()) {
-      onConditions.addAll(Sets.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
-    }
-
-    boolean isTopMostJoin = stack.peek().getType() != NodeType.JOIN;
-
-    List<EvalNode> outerJoinPredicationEvals = new ArrayList<EvalNode>();
-    List<EvalNode> outerJoinFilterEvalsExcludePredication = new ArrayList<EvalNode>();
-    if (LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
-      // TAJO-853
-      // In the case of top most JOIN, all filters except JOIN condition aren't pushed down.
-      // That filters are processed by SELECTION NODE.
-      Set<String> nullSupplyingTableNameSet;
-      if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
-        nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
-      } else {
-        nullSupplyingTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
-      }
-
-      Set<String> preservedTableNameSet;
-      if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
-        preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getRightChild()));
-      } else {
-        preservedTableNameSet = TUtil.newHashSet(PlannerUtil.getRelationLineage(joinNode.getLeftChild()));
-      }
-
-      List<EvalNode> removedFromFilter = new ArrayList<EvalNode>();
-      for (EvalNode eachEval: context.pushingDownFilters) {
-        if (EvalTreeUtil.isJoinQual(block,
-            joinNode.getLeftChild().getOutSchema(),
-            joinNode.getRightChild().getOutSchema(),
-            eachEval, true)) {
-          outerJoinPredicationEvals.add(eachEval);
-          removedFromFilter.add(eachEval);
-        } else {
-          Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachEval);
-          boolean canPushDown = true;
-          for (Column eachColumn: columns) {
-            if (nullSupplyingTableNameSet.contains(eachColumn.getQualifier())) {
-              canPushDown = false;
-              break;
-            }
-          }
-          if (!canPushDown) {
-            outerJoinFilterEvalsExcludePredication.add(eachEval);
-            removedFromFilter.add(eachEval);
-          }
-        }
-      }
-
-      context.pushingDownFilters.removeAll(removedFromFilter);
-
-      for (EvalNode eachOnEval: onConditions) {
-        if (EvalTreeUtil.isJoinQual(eachOnEval, true)) {
-          // If join condition, processing in the JoinNode.
-          outerJoinPredicationEvals.add(eachOnEval);
-        } else {
-          // If Eval has a column which belong to Preserved Row table, not using to push down but using JoinCondition
-          Set<Column> columns = EvalTreeUtil.findUniqueColumns(eachOnEval);
-          boolean canPushDown = true;
-          for (Column eachColumn: columns) {
-            if (preservedTableNameSet.contains(eachColumn.getQualifier())) {
-              canPushDown = false;
-              break;
-            }
-          }
-          if (canPushDown) {
-            context.pushingDownFilters.add(eachOnEval);
-          } else {
-            outerJoinPredicationEvals.add(eachOnEval);
-          }
-        }
-      }
-    } else {
-      context.pushingDownFilters.addAll(onConditions);
+      onPredicates.addAll(TUtil.newHashSet(AlgebraicUtil.toConjunctiveNormalFormArray(joinNode.getJoinQual())));
     }
+    // we assume all the quals in pushingDownFilters as where predicates
+    Set<EvalNode> nonPushableQuals = extractNonPushableJoinQuals(plan, block, joinNode, onPredicates,
+        context.pushingDownFilters);
+    // add every predicate and remove non-pushable ones
+    context.pushingDownFilters.addAll(onPredicates);
+    context.pushingDownFilters.removeAll(nonPushableQuals);
 
     LogicalNode left = joinNode.getLeftChild();
     LogicalNode right = joinNode.getRightChild();
 
-    List<EvalNode> notMatched = new ArrayList<EvalNode>();
+    List<EvalNode> notMatched = TUtil.newList();
     // Join's input schema = right child output columns + left child output columns
     Map<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(context, block, joinNode, left, notMatched,
-        null, true, 0);
+        null, 0);
     context.setFiltersTobePushed(transformedMap.keySet());
     visit(context, plan, block, left, stack);
 
@@ -318,9 +189,9 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
     context.addFiltersTobePushed(notMatched);
 
     notMatched.clear();
-    transformedMap = findCanPushdownAndTransform(context, block, joinNode, right, notMatched, null, true,
+    transformedMap = findCanPushdownAndTransform(context, block, joinNode, right, notMatched, null,
         left.getOutSchema().size());
-    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+    context.setFiltersTobePushed(transformedMap.keySet());
 
     visit(context, plan, block, right, stack);
 
@@ -328,14 +199,18 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
     context.addFiltersTobePushed(notMatched);
 
     notMatched.clear();
-    List<EvalNode> matched = Lists.newArrayList();
-    if(LogicalPlanner.isOuterJoin(joinNode.getJoinType())) {
-      matched.addAll(outerJoinPredicationEvals);
-    } else {
-      for (EvalNode eval : context.pushingDownFilters) {
-        if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, eval, joinNode, isTopMostJoin)) {
-          matched.add(eval);
-        }
+    context.addFiltersTobePushed(nonPushableQuals);
+    List<EvalNode> matched = TUtil.newList();
+
+    // If the query involves a subquery, the stack can be empty.
+    // In this case, this join is the top most one within a query block.
+    boolean isTopMostJoin = stack.isEmpty() ? true : stack.peek().getType() != NodeType.JOIN;
+
+    for (EvalNode evalNode : context.pushingDownFilters) {
+      // TODO: currently, non-equi theta join is not supported yet.
+      if (LogicalPlanner.isEvaluatableJoinQual(block, evalNode, joinNode, onPredicates.contains(evalNode),
+          isTopMostJoin)) {
+        matched.add(evalNode);
       }
     }
 
@@ -355,20 +230,150 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
       if (joinNode.getJoinType() == JoinType.CROSS) {
         joinNode.setJoinType(JoinType.INNER);
       }
-      context.pushingDownFilters.removeAll(matched);
     }
 
-    context.pushingDownFilters.addAll(outerJoinFilterEvalsExcludePredication);
-    context.pushingDownFilters.addAll(thetaJoinFilter);
+    context.pushingDownFilters.removeAll(matched);
     return joinNode;
   }
 
+  private static Set<EvalNode> extractNonPushableJoinQuals(final LogicalPlan plan,
+                                                           final LogicalPlan.QueryBlock block,
+                                                           final JoinNode joinNode,
+                                                           final Set<EvalNode> onPredicates,
+                                                           final Set<EvalNode> wherePredicates)
+      throws PlanningException {
+    Set<EvalNode> nonPushableQuals = TUtil.newHashSet();
+    // TODO: non-equi theta join quals must not be pushed until TAJO-742 is resolved.
+    nonPushableQuals.addAll(extractNonEquiThetaJoinQuals(wherePredicates, block, joinNode));
+
+    // for outer joins
+    if (PlannerUtil.isOuterJoin(joinNode.getJoinType())) {
+      nonPushableQuals.addAll(extractNonPushableOuterJoinQuals(plan, onPredicates, wherePredicates, joinNode));
+    }
+    return nonPushableQuals;
+  }
+
+  /**
+   * For outer joins, pushable predicates can be decided based on their locations in the SQL and types of referencing
+   * relations.
+   *
+   * <h3>Table types</h3>
+   * <ul>
+   *   <li>Preserved Row table : The preserved row table refers to the table that preserves rows when there is no match
+   *   in the join operation. Therefore, all rows from the preserved row table that qualify against the WHERE clause
+   *   will be returned, regardless of whether there is a matched row in the join. For a left/right table, the preserved
+   *   row table is the left/right table. For a full outer join, both tables are preserved row tables.</li>
+   *   <li>Null Supplying table : The NULL-supplying table supplies NULLs when there is an unmatched row. Any column
+   *   from the NULL-supplying table referred to in the SELECT list or subsequent WHERE or ON clause will contain NULL
+   *   if there was no match in the join operation. For a left/right outer join, the NULL-supplying
+   *   table is the right/left table. For a full outer join, both tables are NULL-supplying
+   *   table. In a full outer join, both tables can preserve rows, and also can supply NULLs. This is significant,
+   *   because there are rules that apply to purely preserved row tables that do not apply if the table can also supply
+   *   NULLs.</li>
+   * </ul>
+   *
+   * <h3>Predicate types</h3>
+   * <ul>
+   *   <li>During Join predicate : A predicate that is in the JOIN ON clause.</li>
+   *   <li>After Join predicate : A predicate that is in the WHERE clause.</li>
+   * </ul>
+   *
+   * <h3>Predicate Pushdown Rules</h3>
+   * <ol>
+   *   <li>During Join predicates cannot be pushed past Preserved Row tables.</li>
+   *   <li>After Join predicates cannot be pushed past Null Supplying tables.</li>
+   * </ol>
+   */
+  private static Set<EvalNode> extractNonPushableOuterJoinQuals(final LogicalPlan plan,
+                                                                final Set<EvalNode> onPredicates,
+                                                                final Set<EvalNode> wherePredicates,
+                                                                final JoinNode joinNode) throws PlanningException {
+    Set<String> nullSupplyingTableNameSet = TUtil.newHashSet();
+    Set<String> preservedTableNameSet = TUtil.newHashSet();
+    String leftRelation = PlannerUtil.getTopRelationInLineage(plan, joinNode.getLeftChild());
+    String rightRelation = PlannerUtil.getTopRelationInLineage(plan, joinNode.getRightChild());
+
+    if (joinNode.getJoinType() == JoinType.LEFT_OUTER) {
+      nullSupplyingTableNameSet.add(rightRelation);
+      preservedTableNameSet.add(leftRelation);
+    } else if (joinNode.getJoinType() == JoinType.RIGHT_OUTER) {
+      nullSupplyingTableNameSet.add(leftRelation);
+      preservedTableNameSet.add(rightRelation);
+    } else {
+      // full outer join
+      preservedTableNameSet.add(leftRelation);
+      preservedTableNameSet.add(rightRelation);
+      nullSupplyingTableNameSet.add(leftRelation);
+      nullSupplyingTableNameSet.add(rightRelation);
+    }
+
+    Set<EvalNode> nonPushableQuals = TUtil.newHashSet();
+    for (EvalNode eachQual : onPredicates) {
+      for (String relName : preservedTableNameSet) {
+        if (isEvalNeedRelation(eachQual, relName)) {
+          nonPushableQuals.add(eachQual);
+        }
+      }
+    }
+
+    for (EvalNode eachQual : wherePredicates) {
+      for (String relName : nullSupplyingTableNameSet) {
+        if (isEvalNeedRelation(eachQual, relName)) {
+          nonPushableQuals.add(eachQual);
+        }
+      }
+    }
+
+    return nonPushableQuals;
+  }
+
+  private static boolean isEvalNeedRelation(final EvalNode evalNode, final String relationName) {
+    Set<Column> columns = EvalTreeUtil.findUniqueColumns(evalNode);
+    for (Column column : columns) {
+      if (isColumnFromRelation(column, relationName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean isColumnFromRelation(final Column column, final String relationName) {
+    if (relationName.equals(column.getQualifier())) {
+      return true;
+    }
+    return false;
+  }
+
+  private static boolean isNonEquiThetaJoinQual(final LogicalPlan.QueryBlock block,
+                                                final JoinNode joinNode,
+                                                final EvalNode evalNode) {
+    if (EvalTreeUtil.isJoinQual(block, joinNode.getLeftChild().getOutSchema(),
+        joinNode.getRightChild().getOutSchema(), evalNode, true) &&
+        evalNode.getType() != EvalType.EQUAL) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private static List<EvalNode> extractNonEquiThetaJoinQuals(final Set<EvalNode> predicates,
+                                                             final LogicalPlan.QueryBlock block,
+                                                             final JoinNode joinNode) {
+    List<EvalNode> nonEquiThetaJoinQuals = TUtil.newList();
+    for (EvalNode eachEval: predicates) {
+      if (isNonEquiThetaJoinQual(block, joinNode, eachEval)) {
+        nonEquiThetaJoinQuals.add(eachEval);
+      }
+    }
+    return nonEquiThetaJoinQuals;
+  }
+
   private Map<EvalNode, EvalNode> transformEvalsWidthByPassNode(
       Collection<EvalNode> originEvals, LogicalPlan plan,
       LogicalPlan.QueryBlock block,
       LogicalNode node, LogicalNode childNode) throws PlanningException {
     // transformed -> pushingDownFilters
-    Map<EvalNode, EvalNode> transformedMap = new HashMap<EvalNode, EvalNode>();
+    Map<EvalNode, EvalNode> transformedMap = TUtil.newHashMap();
 
     if (originEvals.isEmpty()) {
       return transformedMap;
@@ -426,7 +431,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
     }
 
     // node in column -> child out column
-    Map<String, String> columnMap = new HashMap<String, String>();
+    Map<String, String> columnMap = TUtil.newHashMap();
 
     for (int i = 0; i < node.getInSchema().size(); i++) {
       String inColumnName = node.getInSchema().getColumn(i).getQualifiedName();
@@ -474,7 +479,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   @Override
   public LogicalNode visitTableSubQuery(FilterPushDownContext context, LogicalPlan plan, LogicalPlan.QueryBlock block,
                                         TableSubQueryNode node, Stack<LogicalNode> stack) throws PlanningException {
-    List<EvalNode> matched = Lists.newArrayList();
+    List<EvalNode> matched = TUtil.newList();
     for (EvalNode eval : context.pushingDownFilters) {
       if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, node)) {
         matched.add(eval);
@@ -484,8 +489,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
     // transformed -> pushingDownFilters
     Map<EvalNode, EvalNode> transformedMap =
         transformEvalsWidthByPassNode(matched, plan, block, node, node.getSubQuery());
-
-    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+    context.setFiltersTobePushed(transformedMap.keySet());
     visit(context, plan, plan.getBlock(node.getSubQuery()));
     context.setToOrigin(transformedMap);
 
@@ -498,11 +502,11 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
                                 Stack<LogicalNode> stack) throws PlanningException {
     LogicalNode leftNode = unionNode.getLeftChild();
 
-    List<EvalNode> origins = new ArrayList<EvalNode>(context.pushingDownFilters);
+    List<EvalNode> origins = TUtil.newList(context.pushingDownFilters);
 
     // transformed -> pushingDownFilters
     Map<EvalNode, EvalNode> transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, leftNode);
-    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+    context.setFiltersTobePushed(transformedMap.keySet());
     visit(context, plan, plan.getBlock(leftNode));
 
     if (!context.pushingDownFilters.isEmpty()) {
@@ -511,7 +515,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
 
     LogicalNode rightNode = unionNode.getRightChild();
     transformedMap = transformEvalsWidthByPassNode(origins, plan, block, unionNode, rightNode);
-    context.setFiltersTobePushed(new HashSet<EvalNode>(transformedMap.keySet()));
+    context.setFiltersTobePushed(transformedMap.keySet());
     visit(context, plan, plan.getBlock(rightNode), rightNode, stack);
 
     if (!context.pushingDownFilters.isEmpty()) {
@@ -531,11 +535,11 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
                                      Stack<LogicalNode> stack) throws PlanningException {
     LogicalNode childNode = projectionNode.getChild();
 
-    List<EvalNode> notMatched = new ArrayList<EvalNode>();
+    List<EvalNode> notMatched = TUtil.newList();
 
     //copy -> origin
     BiMap<EvalNode, EvalNode> transformedMap = findCanPushdownAndTransform(
-        context, block,projectionNode, childNode, notMatched, null, false, 0);
+        context, block,projectionNode, childNode, notMatched, null, 0);
 
     context.setFiltersTobePushed(transformedMap.keySet());
 
@@ -583,7 +587,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   }
 
   private Collection<EvalNode> reverseTransform(BiMap<EvalNode, EvalNode> map, Set<EvalNode> remainFilters) {
-    Set<EvalNode> reversed = Sets.newHashSet();
+    Set<EvalNode> reversed = TUtil.newHashSet();
     for (EvalNode evalNode : remainFilters) {
       reversed.add(map.get(evalNode));
     }
@@ -593,10 +597,9 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   private BiMap<EvalNode, EvalNode> findCanPushdownAndTransform(
       FilterPushDownContext context, LogicalPlan.QueryBlock block, Projectable node,
       LogicalNode childNode, List<EvalNode> notMatched,
-      Set<String> partitionColumns,
-      boolean ignoreJoin, int columnOffset) throws PlanningException {
+      Set<String> partitionColumns, int columnOffset) throws PlanningException {
     // canonical name -> target
-    Map<String, Target> nodeTargetMap = new HashMap<String, Target>();
+    Map<String, Target> nodeTargetMap = TUtil.newHashMap();
     for (Target target : node.getTargets()) {
       nodeTargetMap.put(target.getCanonicalName(), target);
     }
@@ -605,10 +608,6 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
     BiMap<EvalNode, EvalNode> matched = HashBiMap.create();
 
     for (EvalNode eval : context.pushingDownFilters) {
-      if (ignoreJoin && EvalTreeUtil.isJoinQual(block, null, null, eval, true)) {
-        notMatched.add(eval);
-        continue;
-      }
       // If all column is field eval, can push down.
       Set<Column> evalColumns = EvalTreeUtil.findUniqueColumns(eval);
       boolean columnMatched = true;
@@ -725,16 +724,16 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
                                        HavingNode havingNode,
                                        GroupbyNode groupByNode) throws PlanningException {
     // find aggregation column
-    Set<Column> groupingColumns = new HashSet<Column>(Arrays.asList(groupByNode.getGroupingColumns()));
-    Set<String> aggrFunctionOutColumns = new HashSet<String>();
+    Set<Column> groupingColumns = TUtil.newHashSet(groupByNode.getGroupingColumns());
+    Set<String> aggrFunctionOutColumns = TUtil.newHashSet();
     for (Column column : groupByNode.getOutSchema().getColumns()) {
       if (!groupingColumns.contains(column)) {
         aggrFunctionOutColumns.add(column.getQualifiedName());
       }
     }
 
-    List<EvalNode> aggrEvalOrigins = new ArrayList<EvalNode>();
-    List<EvalNode> aggrEvals = new ArrayList<EvalNode>();
+    List<EvalNode> aggrEvalOrigins = TUtil.newList();
+    List<EvalNode> aggrEvals = TUtil.newList();
 
     for (EvalNode eval : context.pushingDownFilters) {
       EvalNode copy = null;
@@ -818,10 +817,10 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
       context.pushingDownFilters.removeAll(aggrEvals);
     }
 
-    List<EvalNode> notMatched = new ArrayList<EvalNode>();
+    List<EvalNode> notMatched = TUtil.newList();
     // transform
     Map<EvalNode, EvalNode> transformed =
-        findCanPushdownAndTransform(context, block, groupbyNode,groupbyNode.getChild(), notMatched, null, false, 0);
+        findCanPushdownAndTransform(context, block, groupbyNode,groupbyNode.getChild(), notMatched, null, 0);
 
     context.setFiltersTobePushed(transformed.keySet());
     LogicalNode current = super.visitGroupBy(context, plan, block, groupbyNode, stack);
@@ -836,10 +835,10 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
   public LogicalNode visitScan(FilterPushDownContext context, LogicalPlan plan,
                                LogicalPlan.QueryBlock block, ScanNode scanNode,
                                Stack<LogicalNode> stack) throws PlanningException {
-    List<EvalNode> matched = Lists.newArrayList();
+    List<EvalNode> matched = TUtil.newList();
 
     // find partition column and check matching
-    Set<String> partitionColumns = new HashSet<String>();
+    Set<String> partitionColumns = TUtil.newHashSet();
     TableDesc table = scanNode.getTableDesc();
     boolean hasQualifiedName = false;
     if (table.hasPartition()) {
@@ -848,7 +847,7 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
         hasQualifiedName = c.hasQualifier();
       }
     }
-    Set<EvalNode> partitionEvals = new HashSet<EvalNode>();
+    Set<EvalNode> partitionEvals = TUtil.newHashSet();
     for (EvalNode eval : context.pushingDownFilters) {
       if (table.hasPartition()) {
         Set<Column> columns = EvalTreeUtil.findUniqueColumns(eval);
@@ -882,11 +881,11 @@ public class FilterPushDownRule extends BasicLogicalPlanVisitor<FilterPushDownCo
 
     context.pushingDownFilters.removeAll(partitionEvals);
 
-    List<EvalNode> notMatched = new ArrayList<EvalNode>();
+    List<EvalNode> notMatched = TUtil.newList();
 
     // transform
     Map<EvalNode, EvalNode> transformed =
-        findCanPushdownAndTransform(context, block, scanNode, null, notMatched, partitionColumns, true, 0);
+        findCanPushdownAndTransform(context, block, scanNode, null, notMatched, partitionColumns, 0);
 
     for (EvalNode eval : transformed.keySet()) {
       if (LogicalPlanner.checkIfBeEvaluatedAtRelation(block, eval, scanNode)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
index abd2814..9a6e625 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/ProjectionPushDownRule.java
@@ -850,7 +850,8 @@ public class ProjectionPushDownRule extends
     // So, we should prevent dividing the binary operator into more subexpressions.
     if (term.getType() != EvalType.FIELD &&
         !(term instanceof BinaryEval) &&
-        !(term.getType() == EvalType.ROW_CONSTANT)) {
+        term.getType() != EvalType.ROW_CONSTANT &&
+        term.getType() != EvalType.CONST) {
       String refName = ctx.addExpr(term);
       EvalTreeUtil.replace(cnf, term, new FieldEval(refName, term.getValueType()));
     }
@@ -914,6 +915,8 @@ public class ProjectionPushDownRule extends
 
       if (context.targetListMgr.isEvaluated(referenceName)) {
         Target fieldReference = new Target(new FieldEval(target.getNamedColumn()));
+        // here, we assume that every exprs are specified at the on clause
+        // because all filters have been moved to appropriate logical nodes during the filter push down phase
         if (LogicalPlanner.checkIfBeEvaluatedAtJoin(block, fieldReference.getEvalTree(), node,
             stack.peek().getType() != NodeType.JOIN)) {
           projectedTargets.add(fieldReference);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b1e174ee/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
index b09fc9e..763f938 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/util/PlannerUtil.java
@@ -218,6 +218,16 @@ public class PlannerUtil {
     return tableNames;
   }
 
+  public static String getTopRelationInLineage(LogicalPlan plan, LogicalNode from) throws PlanningException {
+    RelationFinderVisitor visitor = new RelationFinderVisitor(true);
+    visitor.visit(null, plan, null, from, new Stack<LogicalNode>());
+    if (visitor.getFoundRelations().isEmpty()) {
+      return null;
+    } else {
+      return visitor.getFoundRelations().iterator().next();
+    }
+  }
+
   /**
    * Get all RelationNodes which are descendant of a given LogicalNode.
    * The finding is restricted within a query block.
@@ -227,13 +237,18 @@ public class PlannerUtil {
    */
   public static Collection<String> getRelationLineageWithinQueryBlock(LogicalPlan plan, LogicalNode from)
       throws PlanningException {
-    RelationFinderVisitor visitor = new RelationFinderVisitor();
+    RelationFinderVisitor visitor = new RelationFinderVisitor(false);
     visitor.visit(null, plan, null, from, new Stack<LogicalNode>());
     return visitor.getFoundRelations();
   }
 
   public static class RelationFinderVisitor extends BasicLogicalPlanVisitor<Object, LogicalNode> {
     private Set<String> foundRelNameSet = Sets.newHashSet();
+    private boolean topOnly = false;
+
+    public RelationFinderVisitor(boolean topOnly) {
+      this.topOnly = topOnly;
+    }
 
     public Set<String> getFoundRelations() {
       return foundRelNameSet;
@@ -242,6 +257,10 @@ public class PlannerUtil {
     @Override
     public LogicalNode visit(Object context, LogicalPlan plan, @Nullable LogicalPlan.QueryBlock block, LogicalNode node,
                              Stack<LogicalNode> stack) throws PlanningException {
+      if (topOnly && foundRelNameSet.size() > 0) {
+        return node;
+      }
+
       if (node.getType() != NodeType.TABLE_SUBQUERY) {
         super.visit(context, plan, block, node, stack);
       }
@@ -759,6 +778,10 @@ public class PlannerUtil {
     return joinType == JoinType.INNER;
   }
 
+  public static boolean isOuterJoin(JoinType joinType) {
+    return joinType == JoinType.LEFT_OUTER || joinType == JoinType.RIGHT_OUTER || joinType==JoinType.FULL_OUTER;
+  }
+
   public static boolean existsAggregationFunction(Expr expr) throws PlanningException {
     AggregationFunctionFinder finder = new AggregationFunctionFinder();
     AggFunctionFoundResult result = new AggFunctionFoundResult();