You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2016/01/28 05:46:45 UTC

[6/7] incubator-asterixdb git commit: ASTERIXDB-1005, ASTERIXDB-1263: Clean up subplan flattening: 1. Inline NestedTupleSource and remove SubplanOperator for special cases that join operators inside the SubplanOperator can be re-targeted for correl

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
index c5e99b2..d88b69f 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
@@ -27,6 +27,7 @@ import java.util.Set;
 
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -46,28 +47,33 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
-
-/***
- * This rule inlines (deep copies) a SubplanOperator's input query plan to
- * replace its NestedTupleSources.
- * Then, the SubplanOperator is replaced by:
- * 1. a LeftOuterOperatorJoin between the SubplanOperator's input operator and the
- * SubplanOperator's root operator's input.
- * 2. and on top of the LeftOuterJoinOperator, a GroupByOperaptor in which the
- * nested plan consists of the SubplanOperator's root operator.
- */
-/*
-This is an abstract example for this rule:
 
+import com.google.common.collect.ImmutableSet;
+
+/*
+This rule  is to remove SubplanOperators containing DataScan, InnerJoin, LeftOuterJoin, UnionAll or Distinct. Given a qualified Subplan operator called S1,
+Let's call its input operator O1.
+
+General Cases
+We have the following rewritings for general cases:
+R1. Replace all NestedTupleSourceOperators in S1 with deep-copies (with new variables) of the query plan rooted at O1;
+R2. Add a LeftOuterOperatorJoinOperator (let's call it LJ) between O1 and the SubplanOperator's root operator's input (let's call it SO1),
+    where O1 is the left branch and SO1 is the right branch;
+R3. The deep copy of the primary key variables in O1 should be preserved from an inlined NestedTupleSourceOperator to SO1.
+    The join condition of LJ is the equality between the primary key variables in O1 and its deep copied version at SO1;
+R4. A variable v indicating non-match tuples is assigned to TRUE between LJ and SO1;
+R5. On top of the LJ, add a GroupByOperaptor in which the nested plan consists of the S1's root operator, i.e., an aggregate operator.
+    Below the aggregate, there is a not-null-filter on variable v. The group key is the primary key variables in O1.
+
+This is an abstract example for the rewriting mechanism described above:
 Before rewriting:
 --Op1
   --Subplan{
@@ -96,190 +102,157 @@ After rewriting:
              .....
                --Deepcopy_The_Plan_Rooted_At_InputOp_With_New_Variables(InputOp)
 
-In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp,
-while v_rc_1, ..., v_rc_n are their corresponding variables populated from the deepcopy of InputOp.
-("Covering" variables form a set of variables that can imply all live variables.)
-v_l1, ....v_ln in the decoration part of the added group-by operator are all
-live variables at InputOp except the covering variables v_lc_1, ..., v_lc_n.
-
-TODO(buyingyi): the rewritten plan is wrong when there are duplicate tuples from InputOp: ASTERIXDB-1168.
-
-Here are two concrete examples. (The top child of a join operator is the outer branch.)
----------- Example 1 -----------
-FINE: >>>> Before plan
-distribute result [%0->$$27] -- |UNPARTITIONED|
-  project ([$$27]) -- |UNPARTITIONED|
-    assign [$$27] <- [function-call: asterix:open-record-constructor, Args:[AString: {subscription-id}, %0->$$37, AString: {execution-time}, function-call: asterix:current-datetime, Args:[], AString: {result}, %0->$$6]] -- |UNPARTITIONED|
-      unnest $$6 <- function-call: asterix:scan-collection, Args:[%0->$$26] -- |UNPARTITIONED|
-        subplan {
-                  aggregate [$$26] <- [function-call: asterix:listify, Args:[%0->$$22]] -- |UNPARTITIONED|
-                    join (TRUE) -- |UNPARTITIONED|
-                      select (%0->$$21) -- |UNPARTITIONED|
-                        group by ([$$30 := %0->$$35]) decor ([%0->$$5; %0->$$7; %0->$$8; %0->$$31]) {
-                                  aggregate [$$21] <- [function-call: asterix:non-empty-stream, Args:[]] -- |UNPARTITIONED|
-                                    select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$34]]) -- |UNPARTITIONED|
-                                      nested tuple source -- |UNPARTITIONED|
-                               } -- |UNPARTITIONED|
-                          left outer join (function-call: algebricks:eq, Args:[%0->$$36, %0->$$7]) -- |UNPARTITIONED|
-                            data-scan []<-[$$31, $$8] <- emergencyTest:CHPReports -- |UNPARTITIONED|
-                              nested tuple source -- |UNPARTITIONED|
-                            assign [$$34] <- [TRUE] -- |UNPARTITIONED|
-                              assign [$$36] <- [function-call: asterix:field-access-by-index, Args:[%0->$$10, AInt32: {1}]] -- |UNPARTITIONED|
-                                data-scan []<-[$$32, $$10] <- emergencyTest:userLocations -- |UNPARTITIONED|
-                                  empty-tuple-source -- |UNPARTITIONED|
-                      assign [$$22] <- [function-call: asterix:open-record-constructor, Args:[AString: {shelter locations}, %0->$$25]] -- |UNPARTITIONED|
-                        aggregate [$$25] <- [function-call: asterix:listify, Args:[%0->$$24]] -- |UNPARTITIONED|
-                          assign [$$24] <- [function-call: asterix:field-access-by-index, Args:[%0->$$11, AInt32: {1}]] -- |UNPARTITIONED|
-                            data-scan []<-[$$33, $$11] <- emergencyTest:tornadoShelters -- |UNPARTITIONED|
-                              empty-tuple-source -- |UNPARTITIONED|
-               } -- |UNPARTITIONED|
-          assign [$$7] <- [function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32: {1}]] -- |UNPARTITIONED|
-            assign [$$37] <- [function-call: asterix:field-access-by-name, Args:[%0->$$5, AString: {subscription-id}]] -- |UNPARTITIONED|
-              data-scan []<-[$$35, $$5] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
-                empty-tuple-source -- |UNPARTITIONED|
-
+In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp, while v_rc_1, ..., v_rc_n are their corresponding variables populated from the deepcopy of InputOp.
+"Covering" variables form a set of variables that can imply all live variables. v_l1, ....v_ln in the decoration part of the added group-by operator are all live variables
+at InputOp except the covering variables v_lc_1, ..., v_lc_n.  In the current implementation, we use "covering" variables as primary key variables. In the next version, we
+will use the real primary key variables, which will fix ASTERIXDB-1168.
 
-Dec 22, 2015 4:39:22 PM org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController printRuleApplication
-FINE: >>>> After plan
-distribute result [%0->$$27] -- |UNPARTITIONED|
-  project ([$$27]) -- |UNPARTITIONED|
-    assign [$$27] <- [function-call: asterix:open-record-constructor, Args:[AString: {subscription-id}, %0->$$37, AString: {execution-time}, function-call: asterix:current-datetime, Args:[], AString: {result}, %0->$$6]] -- |UNPARTITIONED|
-      unnest $$6 <- function-call: asterix:scan-collection, Args:[%0->$$26] -- |UNPARTITIONED|
-        group by ([$$43 := %0->$$35]) decor ([%0->$$5; %0->$$37; %0->$$7]) {
-                  aggregate [$$26] <- [function-call: asterix:listify, Args:[%0->$$22]] -- |UNPARTITIONED|
-                    select (function-call: algebricks:not-null, Args:[%0->$$42]) -- |UNPARTITIONED|
+Here is a concrete example of the general case rewriting (optimizerts/queries/nested_loj4.aql).
+Before plan:
+distribute result [%0->$$13] -- |UNPARTITIONED|
+  project ([$$13]) -- |UNPARTITIONED|
+    assign [$$13] <- [function-call: asterix:open-record-constructor, Args:[AString: {cust}, %0->$$0, AString: {orders}, %0->$$12]] -- |UNPARTITIONED|
+      subplan {
+                aggregate [$$12] <- [function-call: asterix:listify, Args:[%0->$$1]] -- |UNPARTITIONED|
+                  join (function-call: algebricks:eq, Args:[%0->$$16, %0->$$14]) -- |UNPARTITIONED|
+                    select (function-call: algebricks:eq, Args:[%0->$$18, AInt64: {5}]) -- |UNPARTITIONED|
                       nested tuple source -- |UNPARTITIONED|
-               } -- |UNPARTITIONED|
-          left outer join (function-call: algebricks:eq, Args:[%0->$$35, %0->$$30]) -- |UNPARTITIONED|
-            assign [$$7] <- [function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32: {1}]] -- |UNPARTITIONED|
-              assign [$$37] <- [function-call: asterix:field-access-by-name, Args:[%0->$$5, AString: {subscription-id}]] -- |UNPARTITIONED|
-                data-scan []<-[$$35, $$5] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
-                  empty-tuple-source -- |UNPARTITIONED|
-            assign [$$42] <- [TRUE] -- |UNPARTITIONED|
-              join (TRUE) -- |UNPARTITIONED|
-                select (%0->$$21) -- |UNPARTITIONED|
-                  group by ([$$30 := %0->$$41]) decor ([%0->$$39; %0->$$38; %0->$$8; %0->$$31]) {
-                            aggregate [$$21] <- [function-call: asterix:non-empty-stream, Args:[]] -- |UNPARTITIONED|
-                              select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$34]]) -- |UNPARTITIONED|
-                                nested tuple source -- |UNPARTITIONED|
-                         } -- |UNPARTITIONED|
-                    left outer join (function-call: algebricks:eq, Args:[%0->$$36, %0->$$38]) -- |UNPARTITIONED|
-                      data-scan []<-[$$31, $$8] <- emergencyTest:CHPReports -- |UNPARTITIONED|
-                        assign [$$38] <- [function-call: asterix:field-access-by-index, Args:[%0->$$39, AInt32: {1}]] -- |UNPARTITIONED|
-                          assign [$$40] <- [function-call: asterix:field-access-by-name, Args:[%0->$$39, AString: {subscription-id}]] -- |UNPARTITIONED|
-                            data-scan []<-[$$41, $$39] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
-                              empty-tuple-source -- |UNPARTITIONED|
-                      assign [$$34] <- [TRUE] -- |UNPARTITIONED|
-                        assign [$$36] <- [function-call: asterix:field-access-by-index, Args:[%0->$$10, AInt32: {1}]] -- |UNPARTITIONED|
-                          data-scan []<-[$$32, $$10] <- emergencyTest:userLocations -- |UNPARTITIONED|
-                            empty-tuple-source -- |UNPARTITIONED|
-                assign [$$22] <- [function-call: asterix:open-record-constructor, Args:[AString: {shelter locations}, %0->$$25]] -- |UNPARTITIONED|
-                  aggregate [$$25] <- [function-call: asterix:listify, Args:[%0->$$24]] -- |UNPARTITIONED|
-                    assign [$$24] <- [function-call: asterix:field-access-by-index, Args:[%0->$$11, AInt32: {1}]] -- |UNPARTITIONED|
-                      data-scan []<-[$$33, $$11] <- emergencyTest:tornadoShelters -- |UNPARTITIONED|
-                        empty-tuple-source -- |UNPARTITIONED|
---------------------------------
-
----------- Example 2 -----------
-FINE: >>>> Before plan
-distribute result [%0->$$8] -- |UNPARTITIONED|
-  project ([$$8]) -- |UNPARTITIONED|
-    unnest $$8 <- function-call: asterix:scan-collection, Args:[%0->$$41] -- |UNPARTITIONED|
+                    assign [$$16] <- [function-call: asterix:field-access-by-name, Args:[%0->$$19, AString: {o_custkey}]] -- |UNPARTITIONED|
+                      assign [$$19] <- [function-call: asterix:field-access-by-name, Args:[%0->$$1, AString: {o_$o}]] -- |UNPARTITIONED|
+                        data-scan []<-[$$15, $$1] <- tpch:Orders -- |UNPARTITIONED|
+                          empty-tuple-source -- |UNPARTITIONED|
+             } -- |UNPARTITIONED|
+        assign [$$18] <- [function-call: asterix:field-access-by-index, Args:[%0->$$0, AInt32: {3}]] -- |UNPARTITIONED|
+          data-scan []<-[$$14, $$0] <- tpch:Customers -- |UNPARTITIONED|
+            empty-tuple-source -- |UNPARTITIONED|
+
+ After plan:
+distribute result [%0->$$13] -- |UNPARTITIONED|
+  project ([$$13]) -- |UNPARTITIONED|
+    assign [$$13] <- [function-call: asterix:open-record-constructor, Args:[AString: {cust}, %0->$$0, AString: {orders}, %0->$$12]] -- |UNPARTITIONED|
+      group by ([$$24 := %0->$$14]) decor ([%0->$$0; %0->$$18]) {
+                aggregate [$$12] <- [function-call: asterix:listify, Args:[%0->$$1]] -- |UNPARTITIONED|
+                  select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$23]]) -- |UNPARTITIONED|
+                    nested tuple source -- |UNPARTITIONED|
+             } -- |UNPARTITIONED|
+        left outer join (function-call: algebricks:eq, Args:[%0->$$14, %0->$$22]) -- |UNPARTITIONED|
+          assign [$$18] <- [function-call: asterix:field-access-by-index, Args:[%0->$$0, AInt32: {3}]] -- |UNPARTITIONED|
+            data-scan []<-[$$14, $$0] <- tpch:Customers -- |UNPARTITIONED|
+              empty-tuple-source -- |UNPARTITIONED|
+          assign [$$23] <- [TRUE] -- |UNPARTITIONED|
+            join (function-call: algebricks:eq, Args:[%0->$$16, %0->$$22]) -- |UNPARTITIONED|
+              select (function-call: algebricks:eq, Args:[%0->$$20, AInt64: {5}]) -- |UNPARTITIONED|
+                assign [$$20] <- [function-call: asterix:field-access-by-index, Args:[%0->$$21, AInt32: {3}]] -- |UNPARTITIONED|
+                  data-scan []<-[$$22, $$21] <- tpch:Customers -- |UNPARTITIONED|
+                    empty-tuple-source -- |UNPARTITIONED|
+              assign [$$16] <- [function-call: asterix:field-access-by-name, Args:[%0->$$19, AString: {o_custkey}]] -- |UNPARTITIONED|
+                assign [$$19] <- [function-call: asterix:field-access-by-name, Args:[%0->$$1, AString: {o_$o}]] -- |UNPARTITIONED|
+                  data-scan []<-[$$15, $$1] <- tpch:Orders -- |UNPARTITIONED|
+                    empty-tuple-source -- |UNPARTITIONED|
+
+Special Cases
+For special cases where:
+a. there is a join (let's call it J1.) in the nested plan,
+b. if J1 is an inner join, one input pipeline of J1 has a NestedTupleSource descendant (let's call it N1),
+c. if J1 is a left outer join, the left branch of J1 has a NestedTupleSource descendant (let's call it N1),
+d. there is no tuple dropping from N1 to J1
+
+Rewriting R2 is not necessary since before J1, all tuples from N1 are preserved. But the following rewritings are needed:
+R1'. Replace N1 by the O1 (no additional deep copy);
+R2'. All inner joins on the path from N1 to J1 (including J1) become left-outer joins with the same join conditions;
+R3'. If N1 resides in the right branch of an inner join (let's call it J2) in the path from N1 to J1, switch the left and right branches of J2;
+R4'. For every left join from N1 to J1 transformed from an inner join, a variable vi indicating non-match tuples is assigned to TRUE in its right branch;
+R5'. On top of J1, a GroupByOperaptor G1 is added where the group-by key is the primary key of O1 and the nested query plan for aggregation is the nested pipeline
+     on top of J1 with an added not-null-filter to check all vi are not null.
+R6'. All other NestedTupleSourceOperators in the subplan is inlined with deep copies (with new variables) of the query plan rooted at O1.
+
+This is an abstract example for the special rewriting mechanism described above:
+Before rewriting:
+--Op1
+  --Subplan{
+    --AggregateOp
+      --NestedOp
+        – Inner Join (J1)
+          – (Right branch) ..... (L1)
+          – (Left branch) ..... (R1)
+                    --Nested-Tuple-Source
+    }
+    --InputOp
+      .....
+(Note that pipeline R1 must satisfy the condition that it does not drop any tuples.)
+After rewriting:
+-- Op1
+  – GroupBy v_lc_1, ..., v_lc_n Decor v_l1, ....v_ln {
+            – AggregateOp
+               – NestedOp
+                 – Select v_new!=NULL
+                   – Nested-Tuple-Source
+          }
+     --LeftOuterJoin (J1)
+       (left branch)
+              –  ......  (R1)
+                 – InputOp
+                   .....
+       (right branch)
+             – Assign v_new=TRUE
+                – ..... (L1)
+
+In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp and v_l1, ....v_ln in the decoration part of the added group-by operator are all live variables
+at InputOp except the covering variables v_lc_1, ..., v_lc_n.  In the current implementation, we use "covering" variables as primary key variables. In the next version,
+we will use the real primary key variables, which will fix ASTERIXDB-1168.
+
+Here is a concrete example (optimizerts/queries/nested_loj2.aql). .
+Before plan:
+distribute result [%0->$$17] -- |UNPARTITIONED|
+  project ([$$17]) -- |UNPARTITIONED|
+    assign [$$17] <- [function-call: asterix:open-record-constructor, Args:[AString: {cust}, %0->$$0, AString: {orders}, %0->$$16]] -- |UNPARTITIONED|
       subplan {
-                aggregate [$$41] <- [function-call: asterix:listify, Args:[%0->$$38]] -- |UNPARTITIONED|
-                  assign [$$38] <- [function-call: asterix:open-record-constructor, Args:[AString: {subscription-id}, %0->$$54, AString: {execution-time}, function-call: asterix:current-datetime, Args:[], AString: {result}, %0->$$19]] -- |UNPARTITIONED|
-                    unnest $$19 <- function-call: asterix:scan-collection, Args:[%0->$$37] -- |UNPARTITIONED|
-                      subplan {
-                                aggregate [$$37] <- [function-call: asterix:listify, Args:[%0->$$33]] -- |UNPARTITIONED|
-                                  join (TRUE) -- |UNPARTITIONED|
-                                    select (%0->$$32) -- |UNPARTITIONED|
-                                      group by ([$$16 := %0->$$47; $$43 := %0->$$48; $$15 := %0->$$49]) decor ([%0->$$7; %0->$$42; %0->$$14]) {
-                                                aggregate [$$32] <- [function-call: asterix:non-empty-stream, Args:[]] -- |UNPARTITIONED|
-                                                  select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$46]]) -- |UNPARTITIONED|
-                                                    nested tuple source -- |UNPARTITIONED|
-                                             } -- |UNPARTITIONED|
-                                        left outer join (function-call: algebricks:and, Args:[function-call: algebricks:eq, Args:[%0->$$50, %0->$$14], function-call: asterix:spatial-intersect, Args:[%0->$$47, %0->$$53]]) -- |UNPARTITIONED|
-                                          assign [$$47] <- [function-call: asterix:create-circle, Args:[%0->$$51, %0->$$52]] -- |UNPARTITIONED|
-                                            assign [$$51] <- [function-call: asterix:field-access-by-index, Args:[%0->$$49, AInt32: {1}]] -- |UNPARTITIONED|
-                                              assign [$$52] <- [function-call: asterix:field-access-by-index, Args:[%0->$$49, AInt32: {2}]] -- |UNPARTITIONED|
-                                                data-scan []<-[$$48, $$49] <- emergencyTest:CHPReports -- |UNPARTITIONED|
-                                                  nested tuple source -- |UNPARTITIONED|
-                                          assign [$$46] <- [TRUE] -- |UNPARTITIONED|
-                                            assign [$$50] <- [function-call: asterix:field-access-by-index, Args:[%0->$$17, AInt32: {1}]] -- |UNPARTITIONED|
-                                              assign [$$53] <- [function-call: asterix:field-access-by-index, Args:[%0->$$17, AInt32: {2}]] -- |UNPARTITIONED|
-                                                data-scan []<-[$$44, $$17] <- emergencyTest:userLocations -- |UNPARTITIONED|
-                                                  empty-tuple-source -- |UNPARTITIONED|
-                                    assign [$$33] <- [function-call: asterix:open-record-constructor, Args:[AString: {shelter locations}, %0->$$36]] -- |UNPARTITIONED|
-                                      aggregate [$$36] <- [function-call: asterix:listify, Args:[%0->$$35]] -- |UNPARTITIONED|
-                                        assign [$$35] <- [function-call: asterix:field-access-by-index, Args:[%0->$$18, AInt32: {1}]] -- |UNPARTITIONED|
-                                          data-scan []<-[$$45, $$18] <- emergencyTest:tornadoShelters -- |UNPARTITIONED|
-                                            empty-tuple-source -- |UNPARTITIONED|
-                             } -- |UNPARTITIONED|
+                aggregate [$$16] <- [function-call: asterix:listify, Args:[%0->$$15]] -- |UNPARTITIONED|
+                  assign [$$15] <- [function-call: asterix:open-record-constructor, Args:[AString: {order}, %0->$$1, AString: {items}, %0->$$14]] -- |UNPARTITIONED|
+                    subplan {
+                              aggregate [$$14] <- [function-call: asterix:listify, Args:[%0->$$2]] -- |UNPARTITIONED|
+                                join (function-call: algebricks:eq, Args:[%0->$$20, %0->$$19]) -- |UNPARTITIONED|
+                                  nested tuple source -- |UNPARTITIONED|
+                                  data-scan []<-[$$20, $$21, $$2] <- tpch:LineItems -- |UNPARTITIONED|
+                                    empty-tuple-source -- |UNPARTITIONED|
+                           } -- |UNPARTITIONED|
+                      join (function-call: algebricks:eq, Args:[%0->$$22, %0->$$18]) -- |UNPARTITIONED|
                         nested tuple source -- |UNPARTITIONED|
+                        assign [$$22] <- [function-call: asterix:field-access-by-index, Args:[%0->$$1, AInt32: {1}]] -- |UNPARTITIONED|
+                          data-scan []<-[$$19, $$1] <- tpch:Orders -- |UNPARTITIONED|
+                            empty-tuple-source -- |UNPARTITIONED|
              } -- |UNPARTITIONED|
-        assign [$$14] <- [function-call: asterix:field-access-by-index, Args:[%0->$$7, AInt32: {1}]] -- |UNPARTITIONED|
-          assign [$$54] <- [function-call: asterix:field-access-by-name, Args:[%0->$$7, AString: {subscription-id}]] -- |UNPARTITIONED|
-            data-scan []<-[$$42, $$7] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
-              empty-tuple-source -- |UNPARTITIONED|
-
-
-Dec 28, 2015 12:48:30 PM org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController printRuleApplication
-FINE: >>>> After plan
-distribute result [%0->$$8] -- |UNPARTITIONED|
-  project ([$$8]) -- |UNPARTITIONED|
-    unnest $$8 <- function-call: asterix:scan-collection, Args:[%0->$$41] -- |UNPARTITIONED|
-      group by ([$$60 := %0->$$42]) decor ([%0->$$54; %0->$$7; %0->$$14]) {
-                aggregate [$$41] <- [function-call: asterix:listify, Args:[%0->$$38]] -- |UNPARTITIONED|
-                  select (function-call: algebricks:not-null, Args:[%0->$$59]) -- |UNPARTITIONED|
-                    nested tuple source -- |UNPARTITIONED|
+        data-scan []<-[$$18, $$0] <- tpch:Customers -- |UNPARTITIONED|
+          empty-tuple-source -- |UNPARTITIONED|
+
+After plan:
+distribute result [%0->$$17] -- |UNPARTITIONED|
+  project ([$$17]) -- |UNPARTITIONED|
+    assign [$$17] <- [function-call: asterix:open-record-constructor, Args:[AString: {cust}, %0->$$0, AString: {orders}, %0->$$16]] -- |UNPARTITIONED|
+      group by ([$$30 := %0->$$18]) decor ([%0->$$0]) {
+                aggregate [$$16] <- [function-call: asterix:listify, Args:[%0->$$15]] -- |UNPARTITIONED|
+                  assign [$$15] <- [function-call: asterix:open-record-constructor, Args:[AString: {order}, %0->$$1, AString: {items}, %0->$$14]] -- |UNPARTITIONED|
+                    group by ([$$27 := %0->$$19]) decor ([%0->$$0; %0->$$1; %0->$$18; %0->$$22]) {
+                              aggregate [$$14] <- [function-call: asterix:listify, Args:[%0->$$2]] -- |UNPARTITIONED|
+                                select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$26]]) -- |UNPARTITIONED|
+                                  nested tuple source -- |UNPARTITIONED|
+                           } -- |UNPARTITIONED|
+                      select (function-call: algebricks:and, Args:[function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$28]], function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$29]]]) -- |UNPARTITIONED|
+                        nested tuple source -- |UNPARTITIONED|
              } -- |UNPARTITIONED|
-        left outer join (function-call: algebricks:eq, Args:[%0->$$42, %0->$$58]) -- |UNPARTITIONED|
-          assign [$$14] <- [function-call: asterix:field-access-by-index, Args:[%0->$$7, AInt32: {1}]] -- |UNPARTITIONED|
-            assign [$$54] <- [function-call: asterix:field-access-by-name, Args:[%0->$$7, AString: {subscription-id}]] -- |UNPARTITIONED|
-              data-scan []<-[$$42, $$7] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
+        left outer join (function-call: algebricks:eq, Args:[%0->$$20, %0->$$19]) -- |UNPARTITIONED|
+          left outer join (function-call: algebricks:eq, Args:[%0->$$22, %0->$$18]) -- |UNPARTITIONED|
+            data-scan []<-[$$18, $$0] <- tpch:Customers -- |UNPARTITIONED|
+              empty-tuple-source -- |UNPARTITIONED|
+            assign [$$28] <- [TRUE] -- |UNPARTITIONED|
+              assign [$$22] <- [function-call: asterix:field-access-by-index, Args:[%0->$$1, AInt32: {1}]] -- |UNPARTITIONED|
+                data-scan []<-[$$19, $$1] <- tpch:Orders -- |UNPARTITIONED|
+                  empty-tuple-source -- |UNPARTITIONED|
+          assign [$$29] <- [TRUE] -- |UNPARTITIONED|
+            assign [$$26] <- [TRUE] -- |UNPARTITIONED|
+              data-scan []<-[$$20, $$21, $$2] <- tpch:LineItems -- |UNPARTITIONED|
                 empty-tuple-source -- |UNPARTITIONED|
-          assign [$$59] <- [TRUE] -- |UNPARTITIONED|
-            assign [$$38] <- [function-call: asterix:open-record-constructor, Args:[AString: {subscription-id}, %0->$$57, AString: {execution-time}, function-call: asterix:current-datetime, Args:[], AString: {result}, %0->$$19]] -- |UNPARTITIONED|
-              unnest $$19 <- function-call: asterix:scan-collection, Args:[%0->$$37] -- |UNPARTITIONED|
-                group by ([$$66 := %0->$$58]) decor ([%0->$$55; %0->$$56; %0->$$57]) {
-                          aggregate [$$37] <- [function-call: asterix:listify, Args:[%0->$$33]] -- |UNPARTITIONED|
-                            select (function-call: algebricks:not-null, Args:[%0->$$65]) -- |UNPARTITIONED|
-                              nested tuple source -- |UNPARTITIONED|
-                       } -- |UNPARTITIONED|
-                  left outer join (function-call: algebricks:eq, Args:[%0->$$58, %0->$$64]) -- |UNPARTITIONED|
-                    assign [$$55] <- [function-call: asterix:field-access-by-index, Args:[%0->$$56, AInt32: {1}]] -- |UNPARTITIONED|
-                      assign [$$57] <- [function-call: asterix:field-access-by-name, Args:[%0->$$56, AString: {subscription-id}]] -- |UNPARTITIONED|
-                        data-scan []<-[$$58, $$56] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
-                          empty-tuple-source -- |UNPARTITIONED|
-                    assign [$$65] <- [TRUE] -- |UNPARTITIONED|
-                      join (TRUE) -- |UNPARTITIONED|
-                        select (%0->$$32) -- |UNPARTITIONED|
-                          group by ([$$16 := %0->$$47; $$43 := %0->$$48; $$15 := %0->$$49]) decor ([%0->$$62; %0->$$64; %0->$$61]) {
-                                    aggregate [$$32] <- [function-call: asterix:non-empty-stream, Args:[]] -- |UNPARTITIONED|
-                                      select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$46]]) -- |UNPARTITIONED|
-                                        nested tuple source -- |UNPARTITIONED|
-                                 } -- |UNPARTITIONED|
-                            left outer join (function-call: algebricks:and, Args:[function-call: algebricks:eq, Args:[%0->$$50, %0->$$61], function-call: asterix:spatial-intersect, Args:[%0->$$47, %0->$$53]]) -- |UNPARTITIONED|
-                              assign [$$47] <- [function-call: asterix:create-circle, Args:[%0->$$51, %0->$$52]] -- |UNPARTITIONED|
-                                assign [$$51] <- [function-call: asterix:field-access-by-index, Args:[%0->$$49, AInt32: {1}]] -- |UNPARTITIONED|
-                                  assign [$$52] <- [function-call: asterix:field-access-by-index, Args:[%0->$$49, AInt32: {2}]] -- |UNPARTITIONED|
-                                    data-scan []<-[$$48, $$49] <- emergencyTest:CHPReports -- |UNPARTITIONED|
-                                      assign [$$61] <- [function-call: asterix:field-access-by-index, Args:[%0->$$62, AInt32: {1}]] -- |UNPARTITIONED|
-                                        assign [$$63] <- [function-call: asterix:field-access-by-name, Args:[%0->$$62, AString: {subscription-id}]] -- |UNPARTITIONED|
-                                          data-scan []<-[$$64, $$62] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
-                                            empty-tuple-source -- |UNPARTITIONED|
-                              assign [$$46] <- [TRUE] -- |UNPARTITIONED|
-                                assign [$$50] <- [function-call: asterix:field-access-by-index, Args:[%0->$$17, AInt32: {1}]] -- |UNPARTITIONED|
-                                  assign [$$53] <- [function-call: asterix:field-access-by-index, Args:[%0->$$17, AInt32: {2}]] -- |UNPARTITIONED|
-                                    data-scan []<-[$$44, $$17] <- emergencyTest:userLocations -- |UNPARTITIONED|
-                                      empty-tuple-source -- |UNPARTITIONED|
-                        assign [$$33] <- [function-call: asterix:open-record-constructor, Args:[AString: {shelter locations}, %0->$$36]] -- |UNPARTITIONED|
-                          aggregate [$$36] <- [function-call: asterix:listify, Args:[%0->$$35]] -- |UNPARTITIONED|
-                            assign [$$35] <- [function-call: asterix:field-access-by-index, Args:[%0->$$18, AInt32: {1}]] -- |UNPARTITIONED|
-                              data-scan []<-[$$45, $$18] <- emergencyTest:tornadoShelters -- |UNPARTITIONED|
-                                empty-tuple-source -- |UNPARTITIONED|
----------------------------------
- */
+*/
 public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRewriteRule {
 
     // To make sure the rule only runs once.
@@ -306,54 +279,82 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
             // Traverses non-subplan operators.
             return traverseNonSubplanOperator(op, context);
         }
+        /**
+         * Apply the special join-based rewriting.
+         */
+        Pair<Boolean, Map<LogicalVariable, LogicalVariable>> result = applySpecialFlattening(opRef, context);
+        if (!result.first) {
+            /**
+             * If the special join-based rewriting does not apply, apply the general
+             * rewriting which blindly inlines all NTSs.
+             */
+            result = applyGeneralFlattening(opRef, context);
+        }
+        return result;
+    }
 
-        SubplanOperator subplanOp = (SubplanOperator) op;
-        if (!containsDataSourceScan(subplanOp)) {
-            // Traverses the operator as if it is not a subplan.
-            return traverseNonSubplanOperator(op, context);
+    /***
+     * Deals with operators that are not SubplanOperator.
+     *
+     * @param op
+     *            the operator to consider
+     * @param context
+     * @return
+     * @throws AlgebricksException
+     */
+    private Pair<Boolean, Map<LogicalVariable, LogicalVariable>> traverseNonSubplanOperator(ILogicalOperator op,
+            IOptimizationContext context) throws AlgebricksException {
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        Map<LogicalVariable, LogicalVariable> replacedVarMap = new HashMap<LogicalVariable, LogicalVariable>();
+        Map<LogicalVariable, LogicalVariable> replacedVarMapForAncestor = new HashMap<LogicalVariable, LogicalVariable>();
+        boolean changed = false;
+        for (Mutable<ILogicalOperator> childrenRef : op.getInputs()) {
+            Pair<Boolean, Map<LogicalVariable, LogicalVariable>> resultFromChild = rewriteSubplanOperator(childrenRef,
+                    context);
+            changed = changed || resultFromChild.first;
+            for (Map.Entry<LogicalVariable, LogicalVariable> entry : resultFromChild.second.entrySet()) {
+                if (liveVars.contains(entry.getKey())) {
+                    // Only needs to map live variables for its ancestors.
+                    replacedVarMapForAncestor.put(entry.getKey(), entry.getValue());
+                }
+            }
+            replacedVarMap.putAll(resultFromChild.second);
         }
+        VariableUtilities.substituteVariables(op, replacedVarMap, context);
+        context.computeAndSetTypeEnvironmentForOperator(op);
+        return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(changed, replacedVarMapForAncestor);
+    }
 
-        Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(0);
+    private Pair<Boolean, Map<LogicalVariable, LogicalVariable>> applyGeneralFlattening(Mutable<ILogicalOperator> opRef,
+            IOptimizationContext context) throws AlgebricksException {
+        SubplanOperator subplanOp = (SubplanOperator) opRef.getValue();
+        if (!SubplanFlatteningUtil.containsOperators(subplanOp,
+                ImmutableSet.of(LogicalOperatorTag.DATASOURCESCAN, LogicalOperatorTag.INNERJOIN,
+                        // We don't have nested runtime for union-all and distinct hence we have to include them here.
+                        LogicalOperatorTag.LEFTOUTERJOIN, LogicalOperatorTag.UNIONALL, LogicalOperatorTag.DISTINCT))) {
+            // Traverses the operator as if it is not a subplan.
+            return traverseNonSubplanOperator(subplanOp, context);
+        }
+        Mutable<ILogicalOperator> inputOpRef = subplanOp.getInputs().get(0);
         ILogicalOperator inputOp = inputOpRef.getValue();
-        Map<LogicalVariable, LogicalVariable> varMap = inlineNestedTupleSource(subplanOp, inputOp, context);
+        Pair<Map<LogicalVariable, LogicalVariable>, List<Pair<IOrder, Mutable<ILogicalExpression>>>> varMapAndOrderExprs = SubplanFlatteningUtil
+                .inlineAllNestedTupleSource(subplanOp, context);
+        Map<LogicalVariable, LogicalVariable> varMap = varMapAndOrderExprs.first;
+        if (varMap == null) {
+            // Traverses the operator as if it is not a subplan.
+            return traverseNonSubplanOperator(subplanOp, context);
+        }
 
         // Creates parameters for the left outer join operator.
         Set<LogicalVariable> inputLiveVars = new HashSet<LogicalVariable>();
         VariableUtilities.getLiveVariables(inputOp, inputLiveVars);
-        PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses((AbstractLogicalOperator) inputOp, context);
-        List<FunctionalDependency> fds = context.getFDList(inputOp);
-        Set<LogicalVariable> fdCoveringVars = findFDHeaderVariables(fds, inputLiveVars);
+        Set<LogicalVariable> fdCoveringVars = EquivalenceClassUtils.findFDHeaderVariables(context, inputOp);
 
         Mutable<ILogicalOperator> rightInputOpRef = subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue()
                 .getInputs().get(0);
         ILogicalOperator rightInputOp = rightInputOpRef.getValue();
 
-        Set<LogicalVariable> rightInputLiveVars = new HashSet<LogicalVariable>();
-        VariableUtilities.getLiveVariables(rightInputOp, rightInputLiveVars);
-        Set<LogicalVariable> rightMissingCoveringVars = new HashSet<>();
-        Set<LogicalVariable> varsToEnforce = new HashSet<>();
-        for (LogicalVariable liveVar : fdCoveringVars) {
-            LogicalVariable rightVar = varMap.get(liveVar);
-            if (!rightInputLiveVars.contains(rightVar)) {
-                // Some correlated variables killed in the subplan, therefore needs to be preserved in the subplan.
-                varsToEnforce.add(rightVar);
-                rightMissingCoveringVars.add(liveVar);
-            }
-        }
-        // Recovers killed-variables in leftVars in the query plan rooted at rightInputOp.
-        if (!varsToEnforce.isEmpty()) {
-            Map<LogicalVariable, LogicalVariable> map = VariableUtilities
-                    .enforceVariablesInDescendantsAndSelf(rightInputOpRef, varsToEnforce, context);
-            // Re-maps variables in the left input branch to the variables in the right input branch
-            for (LogicalVariable var : rightMissingCoveringVars) {
-                LogicalVariable rightVar = varMap.get(var);
-                LogicalVariable newVar = map.get(rightVar);
-                if (newVar != null) {
-                    varMap.put(var, newVar);
-                }
-            }
-        }
-
         // Creates a variable to indicate whether a left input tuple is killed in the plan rooted at rightInputOp.
         LogicalVariable assignVar = context.newVar();
         ILogicalOperator assignOp = new AssignOperator(assignVar,
@@ -390,10 +391,8 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
         GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
 
         Map<LogicalVariable, LogicalVariable> replacedVarMap = new HashMap<>();
-        Map<LogicalVariable, LogicalVariable> gbyVarMap = new HashMap<LogicalVariable, LogicalVariable>();
         for (LogicalVariable liveVar : fdCoveringVars) {
             LogicalVariable newVar = context.newVar();
-            gbyVarMap.put(liveVar, newVar);
             groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
                     new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar))));
             // Adds variables for replacements in ancestors.
@@ -411,6 +410,15 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
         Mutable<ILogicalOperator> aggOpRef = subplanOp.getNestedPlans().get(0).getRoots().get(0);
         aggOpRef.getValue().getInputs().clear();
 
+        Mutable<ILogicalOperator> currentOpRef = aggOpRef;
+        // Adds an optional order operator.
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = varMapAndOrderExprs.second;
+        if (!orderExprs.isEmpty()) {
+            OrderOperator orderOp = new OrderOperator(orderExprs);
+            currentOpRef = new MutableObject<ILogicalOperator>(orderOp);
+            aggOpRef.getValue().getInputs().add(currentOpRef);
+        }
+
         // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, i.e.,
         // subplan input tuples that are filtered out within a subplan.
         Mutable<ILogicalExpression> filterVarExpr = new MutableObject<ILogicalExpression>(
@@ -424,7 +432,7 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
                 new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
                         FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NOT), argsForNotFunction)),
                 false, null);
-        aggOpRef.getValue().getInputs().add(new MutableObject<ILogicalOperator>(selectOp));
+        currentOpRef.getValue().getInputs().add(new MutableObject<ILogicalOperator>(selectOp));
 
         selectOp.getInputs().add(new MutableObject<ILogicalOperator>(
                 new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(groupbyOp))));
@@ -432,181 +440,97 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
         nestedRoots.add(aggOpRef);
         nestedPlans.add(new ALogicalPlanImpl(nestedRoots));
         groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(leftOuterJoinOp));
-        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(aggOpRef.getValue(), context);
 
         // Replaces subplan with the group-by operator.
         opRef.setValue(groupbyOp);
-        context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
+        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(groupbyOp, context);
 
         // Recursively applys this rule to the nested plan of the subplan operator,
         // for the case where there are nested subplan operators within {@code subplanOp}.
-        // Note that we do not need to use the resulting variable map to further replace variables,
-        // because rightInputOp must be an aggregate operator which kills all incoming variables.
-        traverseNonSubplanOperator(rightInputOp, context);
+        Pair<Boolean, Map<LogicalVariable, LogicalVariable>> result = rewriteSubplanOperator(rightInputOpRef, context);
+        VariableUtilities.substituteVariables(leftOuterJoinOp, result.second, context);
+        VariableUtilities.substituteVariables(groupbyOp, result.second, context);
+
+        // No var mapping from the right input operator should be populated up.
         return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(true, replacedVarMap);
     }
 
-    /**
-     * @param subplanOp
-     *            a SubplanOperator
-     * @return whether there is a data source scan in the nested logical plans of {@code subplanOp}.
-     */
-    private boolean containsDataSourceScan(SubplanOperator subplanOp) {
-        List<ILogicalPlan> nestedPlans = subplanOp.getNestedPlans();
-        for (ILogicalPlan nestedPlan : nestedPlans) {
-            for (Mutable<ILogicalOperator> opRef : nestedPlan.getRoots()) {
-                if (containsDataScanInDescendantsAndSelf(opRef.getValue())) {
-                    return true;
-                }
-            }
-        }
-        return false;
-    }
+    private Pair<Boolean, Map<LogicalVariable, LogicalVariable>> applySpecialFlattening(Mutable<ILogicalOperator> opRef,
+            IOptimizationContext context) throws AlgebricksException {
+        SubplanOperator subplanOp = (SubplanOperator) opRef.getValue();
+        ILogicalOperator inputOp = subplanOp.getInputs().get(0).getValue();
+        Map<LogicalVariable, LogicalVariable> replacedVarMap = new HashMap<>();
 
-    /**
-     * Whether the query plan rooted {@code currentOp} contains a data source scan operator,
-     * with considering nested subplans.
-     *
-     * @param currentOp
-     *            the current operator
-     * @return true if {@code currentOp} contains a data source scan operator; false otherwise.
-     */
-    private boolean containsDataScanInDescendantsAndSelf(ILogicalOperator currentOp) {
-        if (currentOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
-            return true;
-        }
-        if (currentOp.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
-            if (containsDataSourceScan((SubplanOperator) currentOp)) {
-                return true;
-            }
-        }
-        for (Mutable<ILogicalOperator> childRef : currentOp.getInputs()) {
-            if (containsDataScanInDescendantsAndSelf(childRef.getValue())) {
-                return true;
-            }
-        }
-        return false;
-    }
+        // Recursively applies this rule to the nested plan of the subplan operator,
+        // for the case where there are nested subplan operators within {@code subplanOp}.
+        Pair<Boolean, Map<LogicalVariable, LogicalVariable>> result = rewriteSubplanOperator(
+                subplanOp.getNestedPlans().get(0).getRoots().get(0), context);
 
-    /**
-     * Find the header variables that can imply all the variables in {@code liveVars}
-     *
-     * @param fds,
-     *            a list of functional dependencies
-     * @param liveVars,
-     *            a set of live variables
-     * @return a set of covering variables that can imply all live variables.
-     */
-    private Set<LogicalVariable> findFDHeaderVariables(List<FunctionalDependency> fds, Set<LogicalVariable> liveVars) {
-        Set<LogicalVariable> key = new HashSet<>();
-        Set<LogicalVariable> cover = new HashSet<>();
-        for (FunctionalDependency fd : fds) {
-            key.addAll(fd.getHead());
-            cover.addAll(fd.getTail());
-        }
-        if (cover.equals(liveVars)) {
-            return key;
-        } else {
-            return liveVars;
+        Pair<Set<LogicalVariable>, Mutable<ILogicalOperator>> notNullVarsAndTopJoinRef = SubplanFlatteningUtil
+                .inlineLeftNtsInSubplanJoin(subplanOp, context);
+        if (notNullVarsAndTopJoinRef.first == null) {
+            return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(false, replacedVarMap);
         }
-    }
 
-    /***
-     * Deals with operators that are not SubplanOperator.
-     *
-     * @param op
-     *            the operator to consider
-     * @param context
-     * @return
-     * @throws AlgebricksException
-     */
-    private Pair<Boolean, Map<LogicalVariable, LogicalVariable>> traverseNonSubplanOperator(ILogicalOperator op,
-            IOptimizationContext context) throws AlgebricksException {
+        Set<LogicalVariable> notNullVars = notNullVarsAndTopJoinRef.first;
+        Mutable<ILogicalOperator> topJoinRef = notNullVarsAndTopJoinRef.second;
+
+        // Gets live variables and covering variables from the subplan's input operator.
+        Set<LogicalVariable> fdCoveringVars = EquivalenceClassUtils.findFDHeaderVariables(context, inputOp);
         Set<LogicalVariable> liveVars = new HashSet<>();
-        VariableUtilities.getLiveVariables(op, liveVars);
-        Map<LogicalVariable, LogicalVariable> replacedVarMap = new HashMap<LogicalVariable, LogicalVariable>();
-        Map<LogicalVariable, LogicalVariable> replacedVarMapForAncestor = new HashMap<LogicalVariable, LogicalVariable>();
-        boolean changed = false;
-        for (Mutable<ILogicalOperator> childrenRef : op.getInputs()) {
-            Pair<Boolean, Map<LogicalVariable, LogicalVariable>> resultFromChild = rewriteSubplanOperator(childrenRef,
-                    context);
-            changed = changed || resultFromChild.first;
-            for (Map.Entry<LogicalVariable, LogicalVariable> entry : resultFromChild.second.entrySet()) {
-                if (liveVars.contains(entry.getKey())) {
-                    // Only needs to map live variables for its ancestors.
-                    replacedVarMapForAncestor.put(entry.getKey(), entry.getValue());
-                }
-            }
-            replacedVarMap.putAll(resultFromChild.second);
-        }
-        for (Map.Entry<LogicalVariable, LogicalVariable> entry : replacedVarMap.entrySet()) {
-            VariableUtilities.substituteVariables(op, entry.getKey(), entry.getValue(), context);
-        }
-        context.computeAndSetTypeEnvironmentForOperator(op);
-        return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(changed, replacedVarMapForAncestor);
-    }
+        VariableUtilities.getLiveVariables(inputOp, liveVars);
 
-    /**
-     * Inline a subplan's input to replace the NTSs.
-     *
-     * @param subplanOp
-     *            the subplan operator
-     * @param inputOperator
-     *            the input operator to the subplan
-     * @param context
-     *            the optimization context
-     * @return a map that maps from the variables propagated in {@code inputOperator} to the variables
-     *         defined in the deeply copied query plan.
-     * @throws AlgebricksException
-     */
-    private Map<LogicalVariable, LogicalVariable> inlineNestedTupleSource(SubplanOperator subplanOp,
-            ILogicalOperator inputOperator, IOptimizationContext context) throws AlgebricksException {
-        List<ILogicalPlan> nestedPlans = subplanOp.getNestedPlans();
-        Map<LogicalVariable, LogicalVariable> varMap = new HashMap<LogicalVariable, LogicalVariable>();
-        for (ILogicalPlan plan : nestedPlans) {
-            List<Mutable<ILogicalOperator>> roots = plan.getRoots();
-            for (Mutable<ILogicalOperator> root : roots) {
-                varMap.putAll(replaceNestedTupleSource(root, inputOperator, context));
-            }
-        }
-        return varMap;
-    }
+        // Creates a group-by operator.
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+        GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, subplanOp.getNestedPlans());
 
-    /**
-     * Deep copy the query plan rooted at {@code inputOperator} and replace NTS with the copied plan.
-     *
-     * @param currentInputOpRef,
-     *            the current operator within a subplan
-     * @param inputOperator,
-     *            the input operator to the subplan
-     * @param context
-     *            the optimization context
-     * @return a map that maps from the variables propagated in {@code inputOperator} to the variables
-     *         defined in the deeply copied query plan.
-     * @throws AlgebricksException
-     */
-    private Map<LogicalVariable, LogicalVariable> replaceNestedTupleSource(Mutable<ILogicalOperator> currentInputOpRef,
-            ILogicalOperator inputOperator, IOptimizationContext context) throws AlgebricksException {
-        AbstractLogicalOperator currentOp = (AbstractLogicalOperator) currentInputOpRef.getValue();
-        if (currentOp.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
-            LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor(
-                    context);
-            ILogicalOperator copiedInputOperator = deepCopyVisitor.deepCopy(inputOperator, inputOperator);
-            // Updates the primary key info in the copied plan segment.
-            context.updatePrimaryKeys(deepCopyVisitor.getInputToOutputVariableMapping());
-            currentInputOpRef.setValue(copiedInputOperator);
-            return deepCopyVisitor.getInputToOutputVariableMapping();
+        Map<LogicalVariable, LogicalVariable> gbyVarMap = new HashMap<LogicalVariable, LogicalVariable>();
+        for (LogicalVariable coverVar : fdCoveringVars) {
+            LogicalVariable newVar = context.newVar();
+            gbyVarMap.put(coverVar, newVar);
+            groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(coverVar))));
+            // Adds variables for replacements in ancestors.
+            replacedVarMap.put(coverVar, newVar);
         }
-        // Obtains the variable mapping from child.
-        Map<LogicalVariable, LogicalVariable> varMap = new HashMap<LogicalVariable, LogicalVariable>();
-        for (Mutable<ILogicalOperator> child : currentOp.getInputs()) {
-            varMap.putAll(replaceNestedTupleSource(child, inputOperator, context));
+        for (LogicalVariable liveVar : liveVars) {
+            if (fdCoveringVars.contains(liveVar)) {
+                continue;
+            }
+            groupByDecorList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(null,
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar))));
         }
-        // Substitutes variables in the query plan rooted at currentOp.
-        for (Map.Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
-            VariableUtilities.substituteVariables(currentOp, entry.getKey(), entry.getValue(), context);
+        groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(topJoinRef.getValue()));
+
+        // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, i.e.,
+        // subplan input tuples that are filtered out within a subplan.
+        List<Mutable<ILogicalExpression>> nullCheckExprRefs = new ArrayList<>();
+        for (LogicalVariable notNullVar : notNullVars) {
+            Mutable<ILogicalExpression> filterVarExpr = new MutableObject<ILogicalExpression>(
+                    new VariableReferenceExpression(notNullVar));
+            List<Mutable<ILogicalExpression>> args = new ArrayList<Mutable<ILogicalExpression>>();
+            args.add(filterVarExpr);
+            List<Mutable<ILogicalExpression>> argsForNotFunction = new ArrayList<Mutable<ILogicalExpression>>();
+            argsForNotFunction.add(new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+                    FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_NULL), args)));
+            nullCheckExprRefs.add(new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+                    FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NOT), argsForNotFunction)));
         }
-        return varMap;
-    }
+        Mutable<ILogicalExpression> selectExprRef = nullCheckExprRefs.size() > 1
+                ? new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.AND), nullCheckExprRefs))
+                : nullCheckExprRefs.get(0);
+        SelectOperator selectOp = new SelectOperator(selectExprRef, false, null);
+        topJoinRef.setValue(selectOp);
+        selectOp.getInputs().add(new MutableObject<ILogicalOperator>(
+                new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(groupbyOp))));
+
+        opRef.setValue(groupbyOp);
+        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(groupbyOp, context);
 
+        VariableUtilities.substituteVariables(groupbyOp, result.second, context);
+        replacedVarMap.putAll(result.second);
+        return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(true, replacedVarMap);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
new file mode 100644
index 0000000..6d91923
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.subplan;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+
+import com.google.common.collect.ImmutableSet;
+
+class SubplanFlatteningUtil {
+
+    /**
+     * Blindly inline all NTS's in a Subplan operator.
+     *
+     * @param subplanOp,
+     *            the subplan operator
+     * @param context
+     * @return a map that maps primary key variables in the subplan's input to its deep copies
+     *         in the nested pipeline; the ordering that needs to be maintained for the final
+     *         aggregation in the added group-by operator.
+     * @throws AlgebricksException
+     */
+    public static Pair<Map<LogicalVariable, LogicalVariable>, List<Pair<IOrder, Mutable<ILogicalExpression>>>> inlineAllNestedTupleSource(
+            SubplanOperator subplanOp, IOptimizationContext context) throws AlgebricksException {
+        // For nested subplan, we do not continue for the general inlining.
+        if (OperatorManipulationUtil.ancestorOfOperators(subplanOp,
+                ImmutableSet.of(LogicalOperatorTag.NESTEDTUPLESOURCE))) {
+            return new Pair<Map<LogicalVariable, LogicalVariable>, List<Pair<IOrder, Mutable<ILogicalExpression>>>>(
+                    null, null);
+        }
+        InlineAllNtsInSubplanVisitor visitor = new InlineAllNtsInSubplanVisitor(context, subplanOp);
+
+        // Rewrites the query plan.
+        ILogicalOperator topOp = subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue();
+        ILogicalOperator opToVisit = topOp.getInputs().get(0).getValue();
+        ILogicalOperator result = opToVisit.accept(visitor, null);
+        topOp.getInputs().get(0).setValue(result);
+
+        // Substitute variables in topOp if necessary.
+        VariableUtilities.substituteVariables(topOp, visitor.getVariableMapHistory(), context);
+
+        // Gets ordering variables.
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> orderVars = visitor.getOrderingExpressions();
+        return new Pair<Map<LogicalVariable, LogicalVariable>, List<Pair<IOrder, Mutable<ILogicalExpression>>>>(
+                visitor.getInputVariableToOutputVariableMap(), orderVars);
+    }
+
+    /**
+     * Inline the left NTS in a subplan that satisfies a special condition indicated
+     * by canFlattenSubplanJoinRuleFire(...).
+     *
+     * @param subplanOp
+     *            the SubplanOperator
+     * @param context
+     *            the optimization context
+     * @return A set of variables used for further null-checks, i.e., variables indicating
+     *         whether a tuple produced by a transformed left outer join is a non-match;
+     *         a reference to the top join operator in the nested subplan.
+     * @throws AlgebricksException
+     */
+    public static Pair<Set<LogicalVariable>, Mutable<ILogicalOperator>> inlineLeftNtsInSubplanJoin(
+            SubplanOperator subplanOp, IOptimizationContext context) throws AlgebricksException {
+        Pair<Boolean, ILogicalOperator> applicableAndNtsToRewrite = SubplanFlatteningUtil
+                .isQualifiedForSpecialFlattening(subplanOp);
+        if (!applicableAndNtsToRewrite.first) {
+            return new Pair<Set<LogicalVariable>, Mutable<ILogicalOperator>>(null, null);
+        }
+
+        ILogicalOperator qualifiedNts = applicableAndNtsToRewrite.second;
+        ILogicalOperator subplanInputOp = subplanOp.getInputs().get(0).getValue();
+        InlineLeftNtsInSubplanJoinFlatteningVisitor specialVisitor = new InlineLeftNtsInSubplanJoinFlatteningVisitor(
+                context, subplanInputOp, qualifiedNts);
+
+        // Rewrites the query plan.
+        Mutable<ILogicalOperator> topRef = subplanOp.getNestedPlans().get(0).getRoots().get(0);
+        ILogicalOperator result = topRef.getValue().accept(specialVisitor, null); // The special visitor doesn't replace any input or local variables.
+        Mutable<ILogicalOperator> topJoinRef = specialVisitor.getTopJoinReference();
+        topRef.setValue(result);
+
+        // Inline the rest Nts's as general cases.
+        InlineAllNtsInSubplanVisitor generalVisitor = new InlineAllNtsInSubplanVisitor(context, subplanOp);
+        ILogicalOperator opToVisit = topJoinRef.getValue();
+        result = opToVisit.accept(generalVisitor, null);
+        topJoinRef.setValue(result);
+
+        // Substitute variables in nested pipeline above the top join operator in the nested pipeline if necessary.
+        List<Pair<LogicalVariable, LogicalVariable>> subplanLocalVarMap = generalVisitor.getVariableMapHistory();
+        ILogicalOperator currentOp = topRef.getValue();
+        while (currentOp != result) {
+            VariableUtilities.substituteVariables(currentOp, subplanLocalVarMap, context);
+            currentOp = currentOp.getInputs().get(0).getValue();
+        }
+        return new Pair<Set<LogicalVariable>, Mutable<ILogicalOperator>>(specialVisitor.getNullCheckVariables(),
+                topJoinRef);
+    }
+
+    /**
+     * @param subplanOp
+     *            a SubplanOperator
+     * @return whether there is a data source scan in the nested logical plans of {@code subplanOp}.
+     */
+    public static boolean containsOperators(SubplanOperator subplanOp, Set<LogicalOperatorTag> interestedOperatorTags) {
+        List<ILogicalPlan> nestedPlans = subplanOp.getNestedPlans();
+        for (ILogicalPlan nestedPlan : nestedPlans) {
+            for (Mutable<ILogicalOperator> opRef : nestedPlan.getRoots()) {
+                if (containsOperatorsInternal(opRef.getValue(), interestedOperatorTags)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Determine whether a subplan could be rewritten as a join-related special case.
+     * The conditions include:
+     * a. there is a join (let's call it J1.) in the nested plan,
+     * b. if J1 is an inner join, one input pipeline of J1 has a NestedTupleSource descendant (let's call it N1),
+     * c. if J1 is a left outer join, the left branch of J1 has a NestedTupleSource descendant (let's call it N1),
+     * d. there is no tuple dropping from N1 to J1.
+     *
+     * @param subplanOp,
+     *            the SubplanOperator to consider
+     * @return TRUE if the rewriting is applicable; FALSE otherwise.
+     * @throws AlgebricksException
+     */
+    private static Pair<Boolean, ILogicalOperator> isQualifiedForSpecialFlattening(SubplanOperator subplanOp)
+            throws AlgebricksException {
+        if (!OperatorManipulationUtil.ancestorOfOperators(
+                subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue(),
+                // we don't need to check recursively for this special rewriting.
+                ImmutableSet.of(LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN))) {
+            return new Pair<Boolean, ILogicalOperator>(false, null);
+        }
+        SubplanSpecialFlatteningCheckVisitor visitor = new SubplanSpecialFlatteningCheckVisitor();
+        for (ILogicalPlan plan : subplanOp.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
+                if (!opRef.getValue().accept(visitor, null)) {
+                    return new Pair<Boolean, ILogicalOperator>(false, null);
+                }
+            }
+        }
+        return new Pair<Boolean, ILogicalOperator>(true, visitor.getQualifiedNts());
+    }
+
+    /**
+     * Whether the query plan rooted {@code currentOp} contains a data source scan operator,
+     * with considering nested subplans.
+     *
+     * @param currentOp
+     *            the current operator
+     * @return true if {@code currentOp} contains a data source scan operator; false otherwise.
+     */
+    private static boolean containsOperatorsInternal(ILogicalOperator currentOp,
+            Set<LogicalOperatorTag> interestedOperatorTags) {
+        if (interestedOperatorTags.contains(currentOp.getOperatorTag())) {
+            return true;
+        }
+        if (currentOp.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+            if (containsOperators((SubplanOperator) currentOp, interestedOperatorTags)) {
+                return true;
+            }
+        }
+        for (Mutable<ILogicalOperator> childRef : currentOp.getInputs()) {
+            if (containsOperatorsInternal(childRef.getValue(), interestedOperatorTags)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
new file mode 100644
index 0000000..1a2e8bb
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.subplan;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
+
+/**
+ * This visitor determines whether a nested subplan in a SubplanOperator could be rewritten as
+ * a join-related special case.
+ * The conditions include:
+ * a. there is a join (let's call it J1.) in the nested plan,
+ * b. if J1 is an inner join, one input pipeline of J1 has a NestedTupleSource descendant (let's call it N1),
+ * c. if J1 is a left outer join, the left branch of J1 has a NestedTupleSource descendant (let's call it N1),
+ * d. there is no tuple dropping from N1 to J1.
+ */
+class SubplanSpecialFlatteningCheckVisitor implements IQueryOperatorVisitor<Boolean, Void> {
+    // Accept with an assumption that there doesn't exist an ancestor operator like J1 of the visiting
+    // tuple discarding or cardinality reducing operator.
+    // That is, a tuple discarding or cardinality operator like SelectOperator could not be
+    // on the path from N1 to J1, but could be on top of J1.
+    // For instance, during the post order visiting, if we hit a SelectOperator, we set rejectPending
+    // bit to be true, and later if we backtrack to J1 which is an ancestor of N1, we think the path
+    // from N1 to J1 is not valid because condition d is not met.
+    // Then we reset rejectPending to false and traverse another child of J1.
+    private boolean rejectPending = false;
+
+    // The qualified NTS to be replaced.
+    private ILogicalOperator qualifiedNtsOp;
+
+    public ILogicalOperator getQualifiedNts() {
+        return qualifiedNtsOp;
+    }
+
+    @Override
+    public Boolean visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+        return visitCardinalityReduceOperator(op);
+    }
+
+    @Override
+    public Boolean visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+        return visitCardinalityReduceOperator(op);
+    }
+
+    @Override
+    public Boolean visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public Boolean visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+        return visitCardinalityReduceOperator(op);
+    }
+
+    @Override
+    public Boolean visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+        return visitTupleDiscardingOperator(op);
+    }
+
+    @Override
+    public Boolean visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            if (childRef.getValue().accept(this, null) && !rejectPending) {
+                return true;
+            }
+            rejectPending = false;
+        }
+        return false;
+    }
+
+    @Override
+    public Boolean visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+        // Check whether the left branch is a qualified branch.
+        boolean isLeftBranchQualified = op.getInputs().get(0).getValue().accept(this, null);
+        return !rejectPending && isLeftBranchQualified;
+    }
+
+    @Override
+    public Boolean visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+        qualifiedNtsOp = op;
+        return true;
+    }
+
+    @Override
+    public Boolean visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+        return visitTupleDiscardingOperator(op);
+    }
+
+    @Override
+    public Boolean visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public Boolean visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public Boolean visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public Boolean visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+        // Flattening with an Union Operator in the pipeline will perturb the query semantics,
+        // e.g., the result cardinality can change.
+        return false;
+    }
+
+    @Override
+    public Boolean visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public Boolean visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public Boolean visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitExternalDataLookupOperator(ExternalDataLookupOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    @Override
+    public Boolean visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
+    private boolean visitInputs(ILogicalOperator op) throws AlgebricksException {
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            if (childRef.getValue().accept(this, null)) {
+                // One input is qualified.
+                return true;
+            }
+        }
+        // All inputs are disqualified.
+        return false;
+    }
+
+    /**
+     * If an operator reduces its input cardinality, the operator should not be a descendant
+     * of a join operator.
+     *
+     * @param op,
+     *            the operator to consider
+     * @return FALSE if it is certainly disqualified; TRUE otherwise.
+     * @throws AlgebricksException
+     */
+    private Boolean visitCardinalityReduceOperator(ILogicalOperator op) throws AlgebricksException {
+        return visitTupleDiscardingOrCardinalityReduceOperator(op);
+    }
+
+    /**
+     * If an operator discard tuples variables before the join, the query's
+     * semantics cannot be preserved after applying the <code>FlatternSubplanJoinRule</code> rule.
+     *
+     * @param op,
+     *            the operator to consider
+     * @return FALSE if it is certainly disqualified; TRUE otherwise.
+     * @throws AlgebricksException
+     */
+    private Boolean visitTupleDiscardingOperator(ILogicalOperator op) throws AlgebricksException {
+        return visitTupleDiscardingOrCardinalityReduceOperator(op);
+    }
+
+    private boolean visitTupleDiscardingOrCardinalityReduceOperator(ILogicalOperator op) throws AlgebricksException {
+        boolean result = visitInputs(op);
+        rejectPending = true;
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
index 7b29ba3..9f98da0 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
@@ -20,8 +20,10 @@
 package org.apache.asterix.optimizer.rules.util;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.lang.common.util.FunctionUtil;
@@ -31,6 +33,8 @@ import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
@@ -39,8 +43,13 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
 import org.mortbay.util.SingletonList;
 
 public class EquivalenceClassUtils {
@@ -99,4 +108,51 @@ public class EquivalenceClassUtils {
         }
     }
 
+    /**
+     * Find the header variables that can imply all subplan-local live variables at <code>operator</code>.
+     *
+     * @param context
+     *            the optimization context.
+     * @param operator
+     *            the operator of interest.
+     * @return a set of covering variables that can imply all subplan-local live variables at <code>operator</code>.
+     * @throws AlgebricksException
+     */
+    public static Set<LogicalVariable> findFDHeaderVariables(IOptimizationContext context, ILogicalOperator operator)
+            throws AlgebricksException {
+        PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses((AbstractLogicalOperator) operator, context);
+        List<FunctionalDependency> fds = context.getFDList(operator);
+        context.clearAllFDAndEquivalenceClasses();
+
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getSubplanLocalLiveVariables(operator, liveVars);
+
+        Set<LogicalVariable> key = new HashSet<>();
+        Set<LogicalVariable> cover = new HashSet<>();
+        for (FunctionalDependency fd : fds) {
+            List<LogicalVariable> head = fd.getHead();
+            head.retainAll(liveVars);
+            key.addAll(head);
+            cover.addAll(fd.getTail());
+            if (cover.containsAll(liveVars)) {
+                return key;
+            }
+        }
+        if (cover.containsAll(liveVars)) {
+            return key;
+        } else {
+            IVariableTypeEnvironment env = context.getOutputTypeEnvironment(operator);
+            Set<LogicalVariable> keyVars = new HashSet<>();
+            for (LogicalVariable var : liveVars) {
+                IAType type = (IAType) env.getVarType(var);
+                ATypeTag typeTag = type.getTypeTag();
+                if (typeTag == ATypeTag.RECORD || typeTag == ATypeTag.ORDEREDLIST
+                        || typeTag == ATypeTag.UNORDEREDLIST) {
+                    continue;
+                }
+                keyVars.add(var);
+            }
+            return keyVars;
+        }
+    }
 }