You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2016/01/28 05:46:45 UTC
[6/7] incubator-asterixdb git commit: ASTERIXDB-1005,
ASTERIXDB-1263: Clean up subplan flattening: 1. Inline
NestedTupleSource and remove SubplanOperator for special cases that join
operators inside the SubplanOperator can be re-targeted for correl
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
index c5e99b2..d88b69f 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -46,28 +47,33 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperat
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
-
-/***
- * This rule inlines (deep copies) a SubplanOperator's input query plan to
- * replace its NestedTupleSources.
- * Then, the SubplanOperator is replaced by:
- * 1. a LeftOuterOperatorJoin between the SubplanOperator's input operator and the
- * SubplanOperator's root operator's input.
- * 2. and on top of the LeftOuterJoinOperator, a GroupByOperaptor in which the
- * nested plan consists of the SubplanOperator's root operator.
- */
-/*
-This is an abstract example for this rule:
+import com.google.common.collect.ImmutableSet;
+
+/*
+This rule is to remove SubplanOperators containing DataScan, InnerJoin, LeftOuterJoin, UnionAll or Distinct. Given a qualified Subplan operator called S1,
+Let's call its input operator O1.
+
+General Cases
+We have the following rewritings for general cases:
+R1. Replace all NestedTupleSourceOperators in S1 with deep-copies (with new variables) of the query plan rooted at O1;
+R2. Add a LeftOuterOperatorJoinOperator (let's call it LJ) between O1 and the SubplanOperator's root operator's input (let's call it SO1),
+ where O1 is the left branch and SO1 is the right branch;
+R3. The deep copy of the primary key variables in O1 should be preserved from an inlined NestedTupleSourceOperator to SO1.
+ The join condition of LJ is the equality between the primary key variables in O1 and its deep copied version at SO1;
+R4. A variable v indicating non-match tuples is assigned to TRUE between LJ and SO1;
+R5. On top of the LJ, add a GroupByOperaptor in which the nested plan consists of the S1's root operator, i.e., an aggregate operator.
+ Below the aggregate, there is a not-null-filter on variable v. The group key is the primary key variables in O1.
+
+This is an abstract example for the rewriting mechanism described above:
Before rewriting:
--Op1
--Subplan{
@@ -96,190 +102,157 @@ After rewriting:
.....
--Deepcopy_The_Plan_Rooted_At_InputOp_With_New_Variables(InputOp)
-In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp,
-while v_rc_1, ..., v_rc_n are their corresponding variables populated from the deepcopy of InputOp.
-("Covering" variables form a set of variables that can imply all live variables.)
-v_l1, ....v_ln in the decoration part of the added group-by operator are all
-live variables at InputOp except the covering variables v_lc_1, ..., v_lc_n.
-
-TODO(buyingyi): the rewritten plan is wrong when there are duplicate tuples from InputOp: ASTERIXDB-1168.
-
-Here are two concrete examples. (The top child of a join operator is the outer branch.)
----------- Example 1 -----------
-FINE: >>>> Before plan
-distribute result [%0->$$27] -- |UNPARTITIONED|
- project ([$$27]) -- |UNPARTITIONED|
- assign [$$27] <- [function-call: asterix:open-record-constructor, Args:[AString: {subscription-id}, %0->$$37, AString: {execution-time}, function-call: asterix:current-datetime, Args:[], AString: {result}, %0->$$6]] -- |UNPARTITIONED|
- unnest $$6 <- function-call: asterix:scan-collection, Args:[%0->$$26] -- |UNPARTITIONED|
- subplan {
- aggregate [$$26] <- [function-call: asterix:listify, Args:[%0->$$22]] -- |UNPARTITIONED|
- join (TRUE) -- |UNPARTITIONED|
- select (%0->$$21) -- |UNPARTITIONED|
- group by ([$$30 := %0->$$35]) decor ([%0->$$5; %0->$$7; %0->$$8; %0->$$31]) {
- aggregate [$$21] <- [function-call: asterix:non-empty-stream, Args:[]] -- |UNPARTITIONED|
- select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$34]]) -- |UNPARTITIONED|
- nested tuple source -- |UNPARTITIONED|
- } -- |UNPARTITIONED|
- left outer join (function-call: algebricks:eq, Args:[%0->$$36, %0->$$7]) -- |UNPARTITIONED|
- data-scan []<-[$$31, $$8] <- emergencyTest:CHPReports -- |UNPARTITIONED|
- nested tuple source -- |UNPARTITIONED|
- assign [$$34] <- [TRUE] -- |UNPARTITIONED|
- assign [$$36] <- [function-call: asterix:field-access-by-index, Args:[%0->$$10, AInt32: {1}]] -- |UNPARTITIONED|
- data-scan []<-[$$32, $$10] <- emergencyTest:userLocations -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
- assign [$$22] <- [function-call: asterix:open-record-constructor, Args:[AString: {shelter locations}, %0->$$25]] -- |UNPARTITIONED|
- aggregate [$$25] <- [function-call: asterix:listify, Args:[%0->$$24]] -- |UNPARTITIONED|
- assign [$$24] <- [function-call: asterix:field-access-by-index, Args:[%0->$$11, AInt32: {1}]] -- |UNPARTITIONED|
- data-scan []<-[$$33, $$11] <- emergencyTest:tornadoShelters -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
- } -- |UNPARTITIONED|
- assign [$$7] <- [function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$37] <- [function-call: asterix:field-access-by-name, Args:[%0->$$5, AString: {subscription-id}]] -- |UNPARTITIONED|
- data-scan []<-[$$35, $$5] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
-
+In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp, while v_rc_1, ..., v_rc_n are their corresponding variables populated from the deepcopy of InputOp.
+"Covering" variables form a set of variables that can imply all live variables. v_l1, ....v_ln in the decoration part of the added group-by operator are all live variables
+at InputOp except the covering variables v_lc_1, ..., v_lc_n. In the current implementation, we use "covering" variables as primary key variables. In the next version, we
+will use the real primary key variables, which will fix ASTERIXDB-1168.
-Dec 22, 2015 4:39:22 PM org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController printRuleApplication
-FINE: >>>> After plan
-distribute result [%0->$$27] -- |UNPARTITIONED|
- project ([$$27]) -- |UNPARTITIONED|
- assign [$$27] <- [function-call: asterix:open-record-constructor, Args:[AString: {subscription-id}, %0->$$37, AString: {execution-time}, function-call: asterix:current-datetime, Args:[], AString: {result}, %0->$$6]] -- |UNPARTITIONED|
- unnest $$6 <- function-call: asterix:scan-collection, Args:[%0->$$26] -- |UNPARTITIONED|
- group by ([$$43 := %0->$$35]) decor ([%0->$$5; %0->$$37; %0->$$7]) {
- aggregate [$$26] <- [function-call: asterix:listify, Args:[%0->$$22]] -- |UNPARTITIONED|
- select (function-call: algebricks:not-null, Args:[%0->$$42]) -- |UNPARTITIONED|
+Here is a concrete example of the general case rewriting (optimizerts/queries/nested_loj4.aql).
+Before plan:
+distribute result [%0->$$13] -- |UNPARTITIONED|
+ project ([$$13]) -- |UNPARTITIONED|
+ assign [$$13] <- [function-call: asterix:open-record-constructor, Args:[AString: {cust}, %0->$$0, AString: {orders}, %0->$$12]] -- |UNPARTITIONED|
+ subplan {
+ aggregate [$$12] <- [function-call: asterix:listify, Args:[%0->$$1]] -- |UNPARTITIONED|
+ join (function-call: algebricks:eq, Args:[%0->$$16, %0->$$14]) -- |UNPARTITIONED|
+ select (function-call: algebricks:eq, Args:[%0->$$18, AInt64: {5}]) -- |UNPARTITIONED|
nested tuple source -- |UNPARTITIONED|
- } -- |UNPARTITIONED|
- left outer join (function-call: algebricks:eq, Args:[%0->$$35, %0->$$30]) -- |UNPARTITIONED|
- assign [$$7] <- [function-call: asterix:field-access-by-index, Args:[%0->$$5, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$37] <- [function-call: asterix:field-access-by-name, Args:[%0->$$5, AString: {subscription-id}]] -- |UNPARTITIONED|
- data-scan []<-[$$35, $$5] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
- assign [$$42] <- [TRUE] -- |UNPARTITIONED|
- join (TRUE) -- |UNPARTITIONED|
- select (%0->$$21) -- |UNPARTITIONED|
- group by ([$$30 := %0->$$41]) decor ([%0->$$39; %0->$$38; %0->$$8; %0->$$31]) {
- aggregate [$$21] <- [function-call: asterix:non-empty-stream, Args:[]] -- |UNPARTITIONED|
- select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$34]]) -- |UNPARTITIONED|
- nested tuple source -- |UNPARTITIONED|
- } -- |UNPARTITIONED|
- left outer join (function-call: algebricks:eq, Args:[%0->$$36, %0->$$38]) -- |UNPARTITIONED|
- data-scan []<-[$$31, $$8] <- emergencyTest:CHPReports -- |UNPARTITIONED|
- assign [$$38] <- [function-call: asterix:field-access-by-index, Args:[%0->$$39, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$40] <- [function-call: asterix:field-access-by-name, Args:[%0->$$39, AString: {subscription-id}]] -- |UNPARTITIONED|
- data-scan []<-[$$41, $$39] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
- assign [$$34] <- [TRUE] -- |UNPARTITIONED|
- assign [$$36] <- [function-call: asterix:field-access-by-index, Args:[%0->$$10, AInt32: {1}]] -- |UNPARTITIONED|
- data-scan []<-[$$32, $$10] <- emergencyTest:userLocations -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
- assign [$$22] <- [function-call: asterix:open-record-constructor, Args:[AString: {shelter locations}, %0->$$25]] -- |UNPARTITIONED|
- aggregate [$$25] <- [function-call: asterix:listify, Args:[%0->$$24]] -- |UNPARTITIONED|
- assign [$$24] <- [function-call: asterix:field-access-by-index, Args:[%0->$$11, AInt32: {1}]] -- |UNPARTITIONED|
- data-scan []<-[$$33, $$11] <- emergencyTest:tornadoShelters -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
---------------------------------
-
----------- Example 2 -----------
-FINE: >>>> Before plan
-distribute result [%0->$$8] -- |UNPARTITIONED|
- project ([$$8]) -- |UNPARTITIONED|
- unnest $$8 <- function-call: asterix:scan-collection, Args:[%0->$$41] -- |UNPARTITIONED|
+ assign [$$16] <- [function-call: asterix:field-access-by-name, Args:[%0->$$19, AString: {o_custkey}]] -- |UNPARTITIONED|
+ assign [$$19] <- [function-call: asterix:field-access-by-name, Args:[%0->$$1, AString: {o_$o}]] -- |UNPARTITIONED|
+ data-scan []<-[$$15, $$1] <- tpch:Orders -- |UNPARTITIONED|
+ empty-tuple-source -- |UNPARTITIONED|
+ } -- |UNPARTITIONED|
+ assign [$$18] <- [function-call: asterix:field-access-by-index, Args:[%0->$$0, AInt32: {3}]] -- |UNPARTITIONED|
+ data-scan []<-[$$14, $$0] <- tpch:Customers -- |UNPARTITIONED|
+ empty-tuple-source -- |UNPARTITIONED|
+
+ After plan:
+distribute result [%0->$$13] -- |UNPARTITIONED|
+ project ([$$13]) -- |UNPARTITIONED|
+ assign [$$13] <- [function-call: asterix:open-record-constructor, Args:[AString: {cust}, %0->$$0, AString: {orders}, %0->$$12]] -- |UNPARTITIONED|
+ group by ([$$24 := %0->$$14]) decor ([%0->$$0; %0->$$18]) {
+ aggregate [$$12] <- [function-call: asterix:listify, Args:[%0->$$1]] -- |UNPARTITIONED|
+ select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$23]]) -- |UNPARTITIONED|
+ nested tuple source -- |UNPARTITIONED|
+ } -- |UNPARTITIONED|
+ left outer join (function-call: algebricks:eq, Args:[%0->$$14, %0->$$22]) -- |UNPARTITIONED|
+ assign [$$18] <- [function-call: asterix:field-access-by-index, Args:[%0->$$0, AInt32: {3}]] -- |UNPARTITIONED|
+ data-scan []<-[$$14, $$0] <- tpch:Customers -- |UNPARTITIONED|
+ empty-tuple-source -- |UNPARTITIONED|
+ assign [$$23] <- [TRUE] -- |UNPARTITIONED|
+ join (function-call: algebricks:eq, Args:[%0->$$16, %0->$$22]) -- |UNPARTITIONED|
+ select (function-call: algebricks:eq, Args:[%0->$$20, AInt64: {5}]) -- |UNPARTITIONED|
+ assign [$$20] <- [function-call: asterix:field-access-by-index, Args:[%0->$$21, AInt32: {3}]] -- |UNPARTITIONED|
+ data-scan []<-[$$22, $$21] <- tpch:Customers -- |UNPARTITIONED|
+ empty-tuple-source -- |UNPARTITIONED|
+ assign [$$16] <- [function-call: asterix:field-access-by-name, Args:[%0->$$19, AString: {o_custkey}]] -- |UNPARTITIONED|
+ assign [$$19] <- [function-call: asterix:field-access-by-name, Args:[%0->$$1, AString: {o_$o}]] -- |UNPARTITIONED|
+ data-scan []<-[$$15, $$1] <- tpch:Orders -- |UNPARTITIONED|
+ empty-tuple-source -- |UNPARTITIONED|
+
+Special Cases
+For special cases where:
+a. there is a join (let's call it J1.) in the nested plan,
+b. if J1 is an inner join, one input pipeline of J1 has a NestedTupleSource descendant (let's call it N1),
+c. if J1 is a left outer join, the left branch of J1 has a NestedTupleSource descendant (let's call it N1),
+d. there is no tuple dropping from N1 to J1
+
+Rewriting R2 is not necessary since before J1, all tuples from N1 are preserved. But the following rewritings are needed:
+R1'. Replace N1 by the O1 (no additional deep copy);
+R2'. All inner joins on the path from N1 to J1 (including J1) become left-outer joins with the same join conditions;
+R3'. If N1 resides in the right branch of an inner join (let's call it J2) in the path from N1 to J1, switch the left and right branches of J2;
+R4'. For every left join from N1 to J1 transformed from an inner join, a variable vi indicating non-match tuples is assigned to TRUE in its right branch;
+R5'. On top of J1, a GroupByOperaptor G1 is added where the group-by key is the primary key of O1 and the nested query plan for aggregation is the nested pipeline
+ on top of J1 with an added not-null-filter to check all vi are not null.
+R6'. All other NestedTupleSourceOperators in the subplan is inlined with deep copies (with new variables) of the query plan rooted at O1.
+
+This is an abstract example for the special rewriting mechanism described above:
+Before rewriting:
+--Op1
+ --Subplan{
+ --AggregateOp
+ --NestedOp
+ – Inner Join (J1)
+ – (Right branch) ..... (L1)
+ – (Left branch) ..... (R1)
+ --Nested-Tuple-Source
+ }
+ --InputOp
+ .....
+(Note that pipeline R1 must satisfy the condition that it does not drop any tuples.)
+After rewriting:
+-- Op1
+ – GroupBy v_lc_1, ..., v_lc_n Decor v_l1, ....v_ln {
+ – AggregateOp
+ – NestedOp
+ – Select v_new!=NULL
+ – Nested-Tuple-Source
+ }
+ --LeftOuterJoin (J1)
+ (left branch)
+ – ...... (R1)
+ – InputOp
+ .....
+ (right branch)
+ – Assign v_new=TRUE
+ – ..... (L1)
+
+In the plan, v_lc_1, ..., v_lc_n are live "covering" variables at InputOp and v_l1, ....v_ln in the decoration part of the added group-by operator are all live variables
+at InputOp except the covering variables v_lc_1, ..., v_lc_n. In the current implementation, we use "covering" variables as primary key variables. In the next version,
+we will use the real primary key variables, which will fix ASTERIXDB-1168.
+
+Here is a concrete example (optimizerts/queries/nested_loj2.aql). .
+Before plan:
+distribute result [%0->$$17] -- |UNPARTITIONED|
+ project ([$$17]) -- |UNPARTITIONED|
+ assign [$$17] <- [function-call: asterix:open-record-constructor, Args:[AString: {cust}, %0->$$0, AString: {orders}, %0->$$16]] -- |UNPARTITIONED|
subplan {
- aggregate [$$41] <- [function-call: asterix:listify, Args:[%0->$$38]] -- |UNPARTITIONED|
- assign [$$38] <- [function-call: asterix:open-record-constructor, Args:[AString: {subscription-id}, %0->$$54, AString: {execution-time}, function-call: asterix:current-datetime, Args:[], AString: {result}, %0->$$19]] -- |UNPARTITIONED|
- unnest $$19 <- function-call: asterix:scan-collection, Args:[%0->$$37] -- |UNPARTITIONED|
- subplan {
- aggregate [$$37] <- [function-call: asterix:listify, Args:[%0->$$33]] -- |UNPARTITIONED|
- join (TRUE) -- |UNPARTITIONED|
- select (%0->$$32) -- |UNPARTITIONED|
- group by ([$$16 := %0->$$47; $$43 := %0->$$48; $$15 := %0->$$49]) decor ([%0->$$7; %0->$$42; %0->$$14]) {
- aggregate [$$32] <- [function-call: asterix:non-empty-stream, Args:[]] -- |UNPARTITIONED|
- select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$46]]) -- |UNPARTITIONED|
- nested tuple source -- |UNPARTITIONED|
- } -- |UNPARTITIONED|
- left outer join (function-call: algebricks:and, Args:[function-call: algebricks:eq, Args:[%0->$$50, %0->$$14], function-call: asterix:spatial-intersect, Args:[%0->$$47, %0->$$53]]) -- |UNPARTITIONED|
- assign [$$47] <- [function-call: asterix:create-circle, Args:[%0->$$51, %0->$$52]] -- |UNPARTITIONED|
- assign [$$51] <- [function-call: asterix:field-access-by-index, Args:[%0->$$49, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$52] <- [function-call: asterix:field-access-by-index, Args:[%0->$$49, AInt32: {2}]] -- |UNPARTITIONED|
- data-scan []<-[$$48, $$49] <- emergencyTest:CHPReports -- |UNPARTITIONED|
- nested tuple source -- |UNPARTITIONED|
- assign [$$46] <- [TRUE] -- |UNPARTITIONED|
- assign [$$50] <- [function-call: asterix:field-access-by-index, Args:[%0->$$17, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$53] <- [function-call: asterix:field-access-by-index, Args:[%0->$$17, AInt32: {2}]] -- |UNPARTITIONED|
- data-scan []<-[$$44, $$17] <- emergencyTest:userLocations -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
- assign [$$33] <- [function-call: asterix:open-record-constructor, Args:[AString: {shelter locations}, %0->$$36]] -- |UNPARTITIONED|
- aggregate [$$36] <- [function-call: asterix:listify, Args:[%0->$$35]] -- |UNPARTITIONED|
- assign [$$35] <- [function-call: asterix:field-access-by-index, Args:[%0->$$18, AInt32: {1}]] -- |UNPARTITIONED|
- data-scan []<-[$$45, $$18] <- emergencyTest:tornadoShelters -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
- } -- |UNPARTITIONED|
+ aggregate [$$16] <- [function-call: asterix:listify, Args:[%0->$$15]] -- |UNPARTITIONED|
+ assign [$$15] <- [function-call: asterix:open-record-constructor, Args:[AString: {order}, %0->$$1, AString: {items}, %0->$$14]] -- |UNPARTITIONED|
+ subplan {
+ aggregate [$$14] <- [function-call: asterix:listify, Args:[%0->$$2]] -- |UNPARTITIONED|
+ join (function-call: algebricks:eq, Args:[%0->$$20, %0->$$19]) -- |UNPARTITIONED|
+ nested tuple source -- |UNPARTITIONED|
+ data-scan []<-[$$20, $$21, $$2] <- tpch:LineItems -- |UNPARTITIONED|
+ empty-tuple-source -- |UNPARTITIONED|
+ } -- |UNPARTITIONED|
+ join (function-call: algebricks:eq, Args:[%0->$$22, %0->$$18]) -- |UNPARTITIONED|
nested tuple source -- |UNPARTITIONED|
+ assign [$$22] <- [function-call: asterix:field-access-by-index, Args:[%0->$$1, AInt32: {1}]] -- |UNPARTITIONED|
+ data-scan []<-[$$19, $$1] <- tpch:Orders -- |UNPARTITIONED|
+ empty-tuple-source -- |UNPARTITIONED|
} -- |UNPARTITIONED|
- assign [$$14] <- [function-call: asterix:field-access-by-index, Args:[%0->$$7, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$54] <- [function-call: asterix:field-access-by-name, Args:[%0->$$7, AString: {subscription-id}]] -- |UNPARTITIONED|
- data-scan []<-[$$42, $$7] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
-
-
-Dec 28, 2015 12:48:30 PM org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController printRuleApplication
-FINE: >>>> After plan
-distribute result [%0->$$8] -- |UNPARTITIONED|
- project ([$$8]) -- |UNPARTITIONED|
- unnest $$8 <- function-call: asterix:scan-collection, Args:[%0->$$41] -- |UNPARTITIONED|
- group by ([$$60 := %0->$$42]) decor ([%0->$$54; %0->$$7; %0->$$14]) {
- aggregate [$$41] <- [function-call: asterix:listify, Args:[%0->$$38]] -- |UNPARTITIONED|
- select (function-call: algebricks:not-null, Args:[%0->$$59]) -- |UNPARTITIONED|
- nested tuple source -- |UNPARTITIONED|
+ data-scan []<-[$$18, $$0] <- tpch:Customers -- |UNPARTITIONED|
+ empty-tuple-source -- |UNPARTITIONED|
+
+After plan:
+distribute result [%0->$$17] -- |UNPARTITIONED|
+ project ([$$17]) -- |UNPARTITIONED|
+ assign [$$17] <- [function-call: asterix:open-record-constructor, Args:[AString: {cust}, %0->$$0, AString: {orders}, %0->$$16]] -- |UNPARTITIONED|
+ group by ([$$30 := %0->$$18]) decor ([%0->$$0]) {
+ aggregate [$$16] <- [function-call: asterix:listify, Args:[%0->$$15]] -- |UNPARTITIONED|
+ assign [$$15] <- [function-call: asterix:open-record-constructor, Args:[AString: {order}, %0->$$1, AString: {items}, %0->$$14]] -- |UNPARTITIONED|
+ group by ([$$27 := %0->$$19]) decor ([%0->$$0; %0->$$1; %0->$$18; %0->$$22]) {
+ aggregate [$$14] <- [function-call: asterix:listify, Args:[%0->$$2]] -- |UNPARTITIONED|
+ select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$26]]) -- |UNPARTITIONED|
+ nested tuple source -- |UNPARTITIONED|
+ } -- |UNPARTITIONED|
+ select (function-call: algebricks:and, Args:[function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$28]], function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$29]]]) -- |UNPARTITIONED|
+ nested tuple source -- |UNPARTITIONED|
} -- |UNPARTITIONED|
- left outer join (function-call: algebricks:eq, Args:[%0->$$42, %0->$$58]) -- |UNPARTITIONED|
- assign [$$14] <- [function-call: asterix:field-access-by-index, Args:[%0->$$7, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$54] <- [function-call: asterix:field-access-by-name, Args:[%0->$$7, AString: {subscription-id}]] -- |UNPARTITIONED|
- data-scan []<-[$$42, $$7] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
+ left outer join (function-call: algebricks:eq, Args:[%0->$$20, %0->$$19]) -- |UNPARTITIONED|
+ left outer join (function-call: algebricks:eq, Args:[%0->$$22, %0->$$18]) -- |UNPARTITIONED|
+ data-scan []<-[$$18, $$0] <- tpch:Customers -- |UNPARTITIONED|
+ empty-tuple-source -- |UNPARTITIONED|
+ assign [$$28] <- [TRUE] -- |UNPARTITIONED|
+ assign [$$22] <- [function-call: asterix:field-access-by-index, Args:[%0->$$1, AInt32: {1}]] -- |UNPARTITIONED|
+ data-scan []<-[$$19, $$1] <- tpch:Orders -- |UNPARTITIONED|
+ empty-tuple-source -- |UNPARTITIONED|
+ assign [$$29] <- [TRUE] -- |UNPARTITIONED|
+ assign [$$26] <- [TRUE] -- |UNPARTITIONED|
+ data-scan []<-[$$20, $$21, $$2] <- tpch:LineItems -- |UNPARTITIONED|
empty-tuple-source -- |UNPARTITIONED|
- assign [$$59] <- [TRUE] -- |UNPARTITIONED|
- assign [$$38] <- [function-call: asterix:open-record-constructor, Args:[AString: {subscription-id}, %0->$$57, AString: {execution-time}, function-call: asterix:current-datetime, Args:[], AString: {result}, %0->$$19]] -- |UNPARTITIONED|
- unnest $$19 <- function-call: asterix:scan-collection, Args:[%0->$$37] -- |UNPARTITIONED|
- group by ([$$66 := %0->$$58]) decor ([%0->$$55; %0->$$56; %0->$$57]) {
- aggregate [$$37] <- [function-call: asterix:listify, Args:[%0->$$33]] -- |UNPARTITIONED|
- select (function-call: algebricks:not-null, Args:[%0->$$65]) -- |UNPARTITIONED|
- nested tuple source -- |UNPARTITIONED|
- } -- |UNPARTITIONED|
- left outer join (function-call: algebricks:eq, Args:[%0->$$58, %0->$$64]) -- |UNPARTITIONED|
- assign [$$55] <- [function-call: asterix:field-access-by-index, Args:[%0->$$56, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$57] <- [function-call: asterix:field-access-by-name, Args:[%0->$$56, AString: {subscription-id}]] -- |UNPARTITIONED|
- data-scan []<-[$$58, $$56] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
- assign [$$65] <- [TRUE] -- |UNPARTITIONED|
- join (TRUE) -- |UNPARTITIONED|
- select (%0->$$32) -- |UNPARTITIONED|
- group by ([$$16 := %0->$$47; $$43 := %0->$$48; $$15 := %0->$$49]) decor ([%0->$$62; %0->$$64; %0->$$61]) {
- aggregate [$$32] <- [function-call: asterix:non-empty-stream, Args:[]] -- |UNPARTITIONED|
- select (function-call: algebricks:not, Args:[function-call: algebricks:is-null, Args:[%0->$$46]]) -- |UNPARTITIONED|
- nested tuple source -- |UNPARTITIONED|
- } -- |UNPARTITIONED|
- left outer join (function-call: algebricks:and, Args:[function-call: algebricks:eq, Args:[%0->$$50, %0->$$61], function-call: asterix:spatial-intersect, Args:[%0->$$47, %0->$$53]]) -- |UNPARTITIONED|
- assign [$$47] <- [function-call: asterix:create-circle, Args:[%0->$$51, %0->$$52]] -- |UNPARTITIONED|
- assign [$$51] <- [function-call: asterix:field-access-by-index, Args:[%0->$$49, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$52] <- [function-call: asterix:field-access-by-index, Args:[%0->$$49, AInt32: {2}]] -- |UNPARTITIONED|
- data-scan []<-[$$48, $$49] <- emergencyTest:CHPReports -- |UNPARTITIONED|
- assign [$$61] <- [function-call: asterix:field-access-by-index, Args:[%0->$$62, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$63] <- [function-call: asterix:field-access-by-name, Args:[%0->$$62, AString: {subscription-id}]] -- |UNPARTITIONED|
- data-scan []<-[$$64, $$62] <- emergencyTest:NearbySheltersDuringTornadoDangerChannelSubscriptions -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
- assign [$$46] <- [TRUE] -- |UNPARTITIONED|
- assign [$$50] <- [function-call: asterix:field-access-by-index, Args:[%0->$$17, AInt32: {1}]] -- |UNPARTITIONED|
- assign [$$53] <- [function-call: asterix:field-access-by-index, Args:[%0->$$17, AInt32: {2}]] -- |UNPARTITIONED|
- data-scan []<-[$$44, $$17] <- emergencyTest:userLocations -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
- assign [$$33] <- [function-call: asterix:open-record-constructor, Args:[AString: {shelter locations}, %0->$$36]] -- |UNPARTITIONED|
- aggregate [$$36] <- [function-call: asterix:listify, Args:[%0->$$35]] -- |UNPARTITIONED|
- assign [$$35] <- [function-call: asterix:field-access-by-index, Args:[%0->$$18, AInt32: {1}]] -- |UNPARTITIONED|
- data-scan []<-[$$45, $$18] <- emergencyTest:tornadoShelters -- |UNPARTITIONED|
- empty-tuple-source -- |UNPARTITIONED|
----------------------------------
- */
+*/
public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRewriteRule {
// To make sure the rule only runs once.
@@ -306,54 +279,82 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
// Traverses non-subplan operators.
return traverseNonSubplanOperator(op, context);
}
+ /**
+ * Apply the special join-based rewriting.
+ */
+ Pair<Boolean, Map<LogicalVariable, LogicalVariable>> result = applySpecialFlattening(opRef, context);
+ if (!result.first) {
+ /**
+ * If the special join-based rewriting does not apply, apply the general
+ * rewriting which blindly inlines all NTSs.
+ */
+ result = applyGeneralFlattening(opRef, context);
+ }
+ return result;
+ }
- SubplanOperator subplanOp = (SubplanOperator) op;
- if (!containsDataSourceScan(subplanOp)) {
- // Traverses the operator as if it is not a subplan.
- return traverseNonSubplanOperator(op, context);
+ /***
+ * Deals with operators that are not SubplanOperator.
+ *
+ * @param op
+ * the operator to consider
+ * @param context
+ * @return
+ * @throws AlgebricksException
+ */
+ private Pair<Boolean, Map<LogicalVariable, LogicalVariable>> traverseNonSubplanOperator(ILogicalOperator op,
+ IOptimizationContext context) throws AlgebricksException {
+ Set<LogicalVariable> liveVars = new HashSet<>();
+ VariableUtilities.getLiveVariables(op, liveVars);
+ Map<LogicalVariable, LogicalVariable> replacedVarMap = new HashMap<LogicalVariable, LogicalVariable>();
+ Map<LogicalVariable, LogicalVariable> replacedVarMapForAncestor = new HashMap<LogicalVariable, LogicalVariable>();
+ boolean changed = false;
+ for (Mutable<ILogicalOperator> childrenRef : op.getInputs()) {
+ Pair<Boolean, Map<LogicalVariable, LogicalVariable>> resultFromChild = rewriteSubplanOperator(childrenRef,
+ context);
+ changed = changed || resultFromChild.first;
+ for (Map.Entry<LogicalVariable, LogicalVariable> entry : resultFromChild.second.entrySet()) {
+ if (liveVars.contains(entry.getKey())) {
+ // Only needs to map live variables for its ancestors.
+ replacedVarMapForAncestor.put(entry.getKey(), entry.getValue());
+ }
+ }
+ replacedVarMap.putAll(resultFromChild.second);
}
+ VariableUtilities.substituteVariables(op, replacedVarMap, context);
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(changed, replacedVarMapForAncestor);
+ }
- Mutable<ILogicalOperator> inputOpRef = op.getInputs().get(0);
+ private Pair<Boolean, Map<LogicalVariable, LogicalVariable>> applyGeneralFlattening(Mutable<ILogicalOperator> opRef,
+ IOptimizationContext context) throws AlgebricksException {
+ SubplanOperator subplanOp = (SubplanOperator) opRef.getValue();
+ if (!SubplanFlatteningUtil.containsOperators(subplanOp,
+ ImmutableSet.of(LogicalOperatorTag.DATASOURCESCAN, LogicalOperatorTag.INNERJOIN,
+ // We don't have nested runtime for union-all and distinct hence we have to include them here.
+ LogicalOperatorTag.LEFTOUTERJOIN, LogicalOperatorTag.UNIONALL, LogicalOperatorTag.DISTINCT))) {
+ // Traverses the operator as if it is not a subplan.
+ return traverseNonSubplanOperator(subplanOp, context);
+ }
+ Mutable<ILogicalOperator> inputOpRef = subplanOp.getInputs().get(0);
ILogicalOperator inputOp = inputOpRef.getValue();
- Map<LogicalVariable, LogicalVariable> varMap = inlineNestedTupleSource(subplanOp, inputOp, context);
+ Pair<Map<LogicalVariable, LogicalVariable>, List<Pair<IOrder, Mutable<ILogicalExpression>>>> varMapAndOrderExprs = SubplanFlatteningUtil
+ .inlineAllNestedTupleSource(subplanOp, context);
+ Map<LogicalVariable, LogicalVariable> varMap = varMapAndOrderExprs.first;
+ if (varMap == null) {
+ // Traverses the operator as if it is not a subplan.
+ return traverseNonSubplanOperator(subplanOp, context);
+ }
// Creates parameters for the left outer join operator.
Set<LogicalVariable> inputLiveVars = new HashSet<LogicalVariable>();
VariableUtilities.getLiveVariables(inputOp, inputLiveVars);
- PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses((AbstractLogicalOperator) inputOp, context);
- List<FunctionalDependency> fds = context.getFDList(inputOp);
- Set<LogicalVariable> fdCoveringVars = findFDHeaderVariables(fds, inputLiveVars);
+ Set<LogicalVariable> fdCoveringVars = EquivalenceClassUtils.findFDHeaderVariables(context, inputOp);
Mutable<ILogicalOperator> rightInputOpRef = subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue()
.getInputs().get(0);
ILogicalOperator rightInputOp = rightInputOpRef.getValue();
- Set<LogicalVariable> rightInputLiveVars = new HashSet<LogicalVariable>();
- VariableUtilities.getLiveVariables(rightInputOp, rightInputLiveVars);
- Set<LogicalVariable> rightMissingCoveringVars = new HashSet<>();
- Set<LogicalVariable> varsToEnforce = new HashSet<>();
- for (LogicalVariable liveVar : fdCoveringVars) {
- LogicalVariable rightVar = varMap.get(liveVar);
- if (!rightInputLiveVars.contains(rightVar)) {
- // Some correlated variables killed in the subplan, therefore needs to be preserved in the subplan.
- varsToEnforce.add(rightVar);
- rightMissingCoveringVars.add(liveVar);
- }
- }
- // Recovers killed-variables in leftVars in the query plan rooted at rightInputOp.
- if (!varsToEnforce.isEmpty()) {
- Map<LogicalVariable, LogicalVariable> map = VariableUtilities
- .enforceVariablesInDescendantsAndSelf(rightInputOpRef, varsToEnforce, context);
- // Re-maps variables in the left input branch to the variables in the right input branch
- for (LogicalVariable var : rightMissingCoveringVars) {
- LogicalVariable rightVar = varMap.get(var);
- LogicalVariable newVar = map.get(rightVar);
- if (newVar != null) {
- varMap.put(var, newVar);
- }
- }
- }
-
// Creates a variable to indicate whether a left input tuple is killed in the plan rooted at rightInputOp.
LogicalVariable assignVar = context.newVar();
ILogicalOperator assignOp = new AssignOperator(assignVar,
@@ -390,10 +391,8 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
Map<LogicalVariable, LogicalVariable> replacedVarMap = new HashMap<>();
- Map<LogicalVariable, LogicalVariable> gbyVarMap = new HashMap<LogicalVariable, LogicalVariable>();
for (LogicalVariable liveVar : fdCoveringVars) {
LogicalVariable newVar = context.newVar();
- gbyVarMap.put(liveVar, newVar);
groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar))));
// Adds variables for replacements in ancestors.
@@ -411,6 +410,15 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
Mutable<ILogicalOperator> aggOpRef = subplanOp.getNestedPlans().get(0).getRoots().get(0);
aggOpRef.getValue().getInputs().clear();
+ Mutable<ILogicalOperator> currentOpRef = aggOpRef;
+ // Adds an optional order operator.
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = varMapAndOrderExprs.second;
+ if (!orderExprs.isEmpty()) {
+ OrderOperator orderOp = new OrderOperator(orderExprs);
+ currentOpRef = new MutableObject<ILogicalOperator>(orderOp);
+ aggOpRef.getValue().getInputs().add(currentOpRef);
+ }
+
// Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, i.e.,
// subplan input tuples that are filtered out within a subplan.
Mutable<ILogicalExpression> filterVarExpr = new MutableObject<ILogicalExpression>(
@@ -424,7 +432,7 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NOT), argsForNotFunction)),
false, null);
- aggOpRef.getValue().getInputs().add(new MutableObject<ILogicalOperator>(selectOp));
+ currentOpRef.getValue().getInputs().add(new MutableObject<ILogicalOperator>(selectOp));
selectOp.getInputs().add(new MutableObject<ILogicalOperator>(
new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(groupbyOp))));
@@ -432,181 +440,97 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
nestedRoots.add(aggOpRef);
nestedPlans.add(new ALogicalPlanImpl(nestedRoots));
groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(leftOuterJoinOp));
- OperatorManipulationUtil.computeTypeEnvironmentBottomUp(aggOpRef.getValue(), context);
// Replaces subplan with the group-by operator.
opRef.setValue(groupbyOp);
- context.computeAndSetTypeEnvironmentForOperator(groupbyOp);
+ OperatorManipulationUtil.computeTypeEnvironmentBottomUp(groupbyOp, context);
// Recursively applys this rule to the nested plan of the subplan operator,
// for the case where there are nested subplan operators within {@code subplanOp}.
- // Note that we do not need to use the resulting variable map to further replace variables,
- // because rightInputOp must be an aggregate operator which kills all incoming variables.
- traverseNonSubplanOperator(rightInputOp, context);
+ Pair<Boolean, Map<LogicalVariable, LogicalVariable>> result = rewriteSubplanOperator(rightInputOpRef, context);
+ VariableUtilities.substituteVariables(leftOuterJoinOp, result.second, context);
+ VariableUtilities.substituteVariables(groupbyOp, result.second, context);
+
+ // No var mapping from the right input operator should be populated up.
return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(true, replacedVarMap);
}
- /**
- * @param subplanOp
- * a SubplanOperator
- * @return whether there is a data source scan in the nested logical plans of {@code subplanOp}.
- */
- private boolean containsDataSourceScan(SubplanOperator subplanOp) {
- List<ILogicalPlan> nestedPlans = subplanOp.getNestedPlans();
- for (ILogicalPlan nestedPlan : nestedPlans) {
- for (Mutable<ILogicalOperator> opRef : nestedPlan.getRoots()) {
- if (containsDataScanInDescendantsAndSelf(opRef.getValue())) {
- return true;
- }
- }
- }
- return false;
- }
+ private Pair<Boolean, Map<LogicalVariable, LogicalVariable>> applySpecialFlattening(Mutable<ILogicalOperator> opRef,
+ IOptimizationContext context) throws AlgebricksException {
+ SubplanOperator subplanOp = (SubplanOperator) opRef.getValue();
+ ILogicalOperator inputOp = subplanOp.getInputs().get(0).getValue();
+ Map<LogicalVariable, LogicalVariable> replacedVarMap = new HashMap<>();
- /**
- * Whether the query plan rooted {@code currentOp} contains a data source scan operator,
- * with considering nested subplans.
- *
- * @param currentOp
- * the current operator
- * @return true if {@code currentOp} contains a data source scan operator; false otherwise.
- */
- private boolean containsDataScanInDescendantsAndSelf(ILogicalOperator currentOp) {
- if (currentOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
- return true;
- }
- if (currentOp.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
- if (containsDataSourceScan((SubplanOperator) currentOp)) {
- return true;
- }
- }
- for (Mutable<ILogicalOperator> childRef : currentOp.getInputs()) {
- if (containsDataScanInDescendantsAndSelf(childRef.getValue())) {
- return true;
- }
- }
- return false;
- }
+ // Recursively applies this rule to the nested plan of the subplan operator,
+ // for the case where there are nested subplan operators within {@code subplanOp}.
+ Pair<Boolean, Map<LogicalVariable, LogicalVariable>> result = rewriteSubplanOperator(
+ subplanOp.getNestedPlans().get(0).getRoots().get(0), context);
- /**
- * Find the header variables that can imply all the variables in {@code liveVars}
- *
- * @param fds,
- * a list of functional dependencies
- * @param liveVars,
- * a set of live variables
- * @return a set of covering variables that can imply all live variables.
- */
- private Set<LogicalVariable> findFDHeaderVariables(List<FunctionalDependency> fds, Set<LogicalVariable> liveVars) {
- Set<LogicalVariable> key = new HashSet<>();
- Set<LogicalVariable> cover = new HashSet<>();
- for (FunctionalDependency fd : fds) {
- key.addAll(fd.getHead());
- cover.addAll(fd.getTail());
- }
- if (cover.equals(liveVars)) {
- return key;
- } else {
- return liveVars;
+ Pair<Set<LogicalVariable>, Mutable<ILogicalOperator>> notNullVarsAndTopJoinRef = SubplanFlatteningUtil
+ .inlineLeftNtsInSubplanJoin(subplanOp, context);
+ if (notNullVarsAndTopJoinRef.first == null) {
+ return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(false, replacedVarMap);
}
- }
- /***
- * Deals with operators that are not SubplanOperator.
- *
- * @param op
- * the operator to consider
- * @param context
- * @return
- * @throws AlgebricksException
- */
- private Pair<Boolean, Map<LogicalVariable, LogicalVariable>> traverseNonSubplanOperator(ILogicalOperator op,
- IOptimizationContext context) throws AlgebricksException {
+ Set<LogicalVariable> notNullVars = notNullVarsAndTopJoinRef.first;
+ Mutable<ILogicalOperator> topJoinRef = notNullVarsAndTopJoinRef.second;
+
+ // Gets live variables and covering variables from the subplan's input operator.
+ Set<LogicalVariable> fdCoveringVars = EquivalenceClassUtils.findFDHeaderVariables(context, inputOp);
Set<LogicalVariable> liveVars = new HashSet<>();
- VariableUtilities.getLiveVariables(op, liveVars);
- Map<LogicalVariable, LogicalVariable> replacedVarMap = new HashMap<LogicalVariable, LogicalVariable>();
- Map<LogicalVariable, LogicalVariable> replacedVarMapForAncestor = new HashMap<LogicalVariable, LogicalVariable>();
- boolean changed = false;
- for (Mutable<ILogicalOperator> childrenRef : op.getInputs()) {
- Pair<Boolean, Map<LogicalVariable, LogicalVariable>> resultFromChild = rewriteSubplanOperator(childrenRef,
- context);
- changed = changed || resultFromChild.first;
- for (Map.Entry<LogicalVariable, LogicalVariable> entry : resultFromChild.second.entrySet()) {
- if (liveVars.contains(entry.getKey())) {
- // Only needs to map live variables for its ancestors.
- replacedVarMapForAncestor.put(entry.getKey(), entry.getValue());
- }
- }
- replacedVarMap.putAll(resultFromChild.second);
- }
- for (Map.Entry<LogicalVariable, LogicalVariable> entry : replacedVarMap.entrySet()) {
- VariableUtilities.substituteVariables(op, entry.getKey(), entry.getValue(), context);
- }
- context.computeAndSetTypeEnvironmentForOperator(op);
- return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(changed, replacedVarMapForAncestor);
- }
+ VariableUtilities.getLiveVariables(inputOp, liveVars);
- /**
- * Inline a subplan's input to replace the NTSs.
- *
- * @param subplanOp
- * the subplan operator
- * @param inputOperator
- * the input operator to the subplan
- * @param context
- * the optimization context
- * @return a map that maps from the variables propagated in {@code inputOperator} to the variables
- * defined in the deeply copied query plan.
- * @throws AlgebricksException
- */
- private Map<LogicalVariable, LogicalVariable> inlineNestedTupleSource(SubplanOperator subplanOp,
- ILogicalOperator inputOperator, IOptimizationContext context) throws AlgebricksException {
- List<ILogicalPlan> nestedPlans = subplanOp.getNestedPlans();
- Map<LogicalVariable, LogicalVariable> varMap = new HashMap<LogicalVariable, LogicalVariable>();
- for (ILogicalPlan plan : nestedPlans) {
- List<Mutable<ILogicalOperator>> roots = plan.getRoots();
- for (Mutable<ILogicalOperator> root : roots) {
- varMap.putAll(replaceNestedTupleSource(root, inputOperator, context));
- }
- }
- return varMap;
- }
+ // Creates a group-by operator.
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+ GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, subplanOp.getNestedPlans());
- /**
- * Deep copy the query plan rooted at {@code inputOperator} and replace NTS with the copied plan.
- *
- * @param currentInputOpRef,
- * the current operator within a subplan
- * @param inputOperator,
- * the input operator to the subplan
- * @param context
- * the optimization context
- * @return a map that maps from the variables propagated in {@code inputOperator} to the variables
- * defined in the deeply copied query plan.
- * @throws AlgebricksException
- */
- private Map<LogicalVariable, LogicalVariable> replaceNestedTupleSource(Mutable<ILogicalOperator> currentInputOpRef,
- ILogicalOperator inputOperator, IOptimizationContext context) throws AlgebricksException {
- AbstractLogicalOperator currentOp = (AbstractLogicalOperator) currentInputOpRef.getValue();
- if (currentOp.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
- LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor(
- context);
- ILogicalOperator copiedInputOperator = deepCopyVisitor.deepCopy(inputOperator, inputOperator);
- // Updates the primary key info in the copied plan segment.
- context.updatePrimaryKeys(deepCopyVisitor.getInputToOutputVariableMapping());
- currentInputOpRef.setValue(copiedInputOperator);
- return deepCopyVisitor.getInputToOutputVariableMapping();
+ Map<LogicalVariable, LogicalVariable> gbyVarMap = new HashMap<LogicalVariable, LogicalVariable>();
+ for (LogicalVariable coverVar : fdCoveringVars) {
+ LogicalVariable newVar = context.newVar();
+ gbyVarMap.put(coverVar, newVar);
+ groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(coverVar))));
+ // Adds variables for replacements in ancestors.
+ replacedVarMap.put(coverVar, newVar);
}
- // Obtains the variable mapping from child.
- Map<LogicalVariable, LogicalVariable> varMap = new HashMap<LogicalVariable, LogicalVariable>();
- for (Mutable<ILogicalOperator> child : currentOp.getInputs()) {
- varMap.putAll(replaceNestedTupleSource(child, inputOperator, context));
+ for (LogicalVariable liveVar : liveVars) {
+ if (fdCoveringVars.contains(liveVar)) {
+ continue;
+ }
+ groupByDecorList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(null,
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar))));
}
- // Substitutes variables in the query plan rooted at currentOp.
- for (Map.Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
- VariableUtilities.substituteVariables(currentOp, entry.getKey(), entry.getValue(), context);
+ groupbyOp.getInputs().add(new MutableObject<ILogicalOperator>(topJoinRef.getValue()));
+
+ // Adds a select operator into the nested plan for group-by to remove tuples with NULL on {@code assignVar}, i.e.,
+ // subplan input tuples that are filtered out within a subplan.
+ List<Mutable<ILogicalExpression>> nullCheckExprRefs = new ArrayList<>();
+ for (LogicalVariable notNullVar : notNullVars) {
+ Mutable<ILogicalExpression> filterVarExpr = new MutableObject<ILogicalExpression>(
+ new VariableReferenceExpression(notNullVar));
+ List<Mutable<ILogicalExpression>> args = new ArrayList<Mutable<ILogicalExpression>>();
+ args.add(filterVarExpr);
+ List<Mutable<ILogicalExpression>> argsForNotFunction = new ArrayList<Mutable<ILogicalExpression>>();
+ argsForNotFunction.add(new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.IS_NULL), args)));
+ nullCheckExprRefs.add(new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.NOT), argsForNotFunction)));
}
- return varMap;
- }
+ Mutable<ILogicalExpression> selectExprRef = nullCheckExprRefs.size() > 1
+ ? new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.AND), nullCheckExprRefs))
+ : nullCheckExprRefs.get(0);
+ SelectOperator selectOp = new SelectOperator(selectExprRef, false, null);
+ topJoinRef.setValue(selectOp);
+ selectOp.getInputs().add(new MutableObject<ILogicalOperator>(
+ new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(groupbyOp))));
+
+ opRef.setValue(groupbyOp);
+ OperatorManipulationUtil.computeTypeEnvironmentBottomUp(groupbyOp, context);
+ VariableUtilities.substituteVariables(groupbyOp, result.second, context);
+ replacedVarMap.putAll(result.second);
+ return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(true, replacedVarMap);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
new file mode 100644
index 0000000..6d91923
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanFlatteningUtil.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.subplan;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+
+import com.google.common.collect.ImmutableSet;
+
+class SubplanFlatteningUtil {
+
+ /**
+ * Blindly inline all NTS's in a Subplan operator.
+ *
+ * @param subplanOp,
+ * the subplan operator
+ * @param context
+ * @return a map that maps primary key variables in the subplan's input to its deep copies
+ * in the nested pipeline; the ordering that needs to be maintained for the final
+ * aggregation in the added group-by operator.
+ * @throws AlgebricksException
+ */
+ public static Pair<Map<LogicalVariable, LogicalVariable>, List<Pair<IOrder, Mutable<ILogicalExpression>>>> inlineAllNestedTupleSource(
+ SubplanOperator subplanOp, IOptimizationContext context) throws AlgebricksException {
+ // For nested subplan, we do not continue for the general inlining.
+ if (OperatorManipulationUtil.ancestorOfOperators(subplanOp,
+ ImmutableSet.of(LogicalOperatorTag.NESTEDTUPLESOURCE))) {
+ return new Pair<Map<LogicalVariable, LogicalVariable>, List<Pair<IOrder, Mutable<ILogicalExpression>>>>(
+ null, null);
+ }
+ InlineAllNtsInSubplanVisitor visitor = new InlineAllNtsInSubplanVisitor(context, subplanOp);
+
+ // Rewrites the query plan.
+ ILogicalOperator topOp = subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue();
+ ILogicalOperator opToVisit = topOp.getInputs().get(0).getValue();
+ ILogicalOperator result = opToVisit.accept(visitor, null);
+ topOp.getInputs().get(0).setValue(result);
+
+ // Substitute variables in topOp if necessary.
+ VariableUtilities.substituteVariables(topOp, visitor.getVariableMapHistory(), context);
+
+ // Gets ordering variables.
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> orderVars = visitor.getOrderingExpressions();
+ return new Pair<Map<LogicalVariable, LogicalVariable>, List<Pair<IOrder, Mutable<ILogicalExpression>>>>(
+ visitor.getInputVariableToOutputVariableMap(), orderVars);
+ }
+
+ /**
+ * Inline the left NTS in a subplan that satisfies a special condition indicated
+ * by canFlattenSubplanJoinRuleFire(...).
+ *
+ * @param subplanOp
+ * the SubplanOperator
+ * @param context
+ * the optimization context
+ * @return A set of variables used for further null-checks, i.e., variables indicating
+ * whether a tuple produced by a transformed left outer join is a non-match;
+ * a reference to the top join operator in the nested subplan.
+ * @throws AlgebricksException
+ */
+ public static Pair<Set<LogicalVariable>, Mutable<ILogicalOperator>> inlineLeftNtsInSubplanJoin(
+ SubplanOperator subplanOp, IOptimizationContext context) throws AlgebricksException {
+ Pair<Boolean, ILogicalOperator> applicableAndNtsToRewrite = SubplanFlatteningUtil
+ .isQualifiedForSpecialFlattening(subplanOp);
+ if (!applicableAndNtsToRewrite.first) {
+ return new Pair<Set<LogicalVariable>, Mutable<ILogicalOperator>>(null, null);
+ }
+
+ ILogicalOperator qualifiedNts = applicableAndNtsToRewrite.second;
+ ILogicalOperator subplanInputOp = subplanOp.getInputs().get(0).getValue();
+ InlineLeftNtsInSubplanJoinFlatteningVisitor specialVisitor = new InlineLeftNtsInSubplanJoinFlatteningVisitor(
+ context, subplanInputOp, qualifiedNts);
+
+ // Rewrites the query plan.
+ Mutable<ILogicalOperator> topRef = subplanOp.getNestedPlans().get(0).getRoots().get(0);
+ ILogicalOperator result = topRef.getValue().accept(specialVisitor, null); // The special visitor doesn't replace any input or local variables.
+ Mutable<ILogicalOperator> topJoinRef = specialVisitor.getTopJoinReference();
+ topRef.setValue(result);
+
+ // Inline the rest Nts's as general cases.
+ InlineAllNtsInSubplanVisitor generalVisitor = new InlineAllNtsInSubplanVisitor(context, subplanOp);
+ ILogicalOperator opToVisit = topJoinRef.getValue();
+ result = opToVisit.accept(generalVisitor, null);
+ topJoinRef.setValue(result);
+
+ // Substitute variables in nested pipeline above the top join operator in the nested pipeline if necessary.
+ List<Pair<LogicalVariable, LogicalVariable>> subplanLocalVarMap = generalVisitor.getVariableMapHistory();
+ ILogicalOperator currentOp = topRef.getValue();
+ while (currentOp != result) {
+ VariableUtilities.substituteVariables(currentOp, subplanLocalVarMap, context);
+ currentOp = currentOp.getInputs().get(0).getValue();
+ }
+ return new Pair<Set<LogicalVariable>, Mutable<ILogicalOperator>>(specialVisitor.getNullCheckVariables(),
+ topJoinRef);
+ }
+
+ /**
+ * @param subplanOp
+ * a SubplanOperator
+ * @return whether there is a data source scan in the nested logical plans of {@code subplanOp}.
+ */
+ public static boolean containsOperators(SubplanOperator subplanOp, Set<LogicalOperatorTag> interestedOperatorTags) {
+ List<ILogicalPlan> nestedPlans = subplanOp.getNestedPlans();
+ for (ILogicalPlan nestedPlan : nestedPlans) {
+ for (Mutable<ILogicalOperator> opRef : nestedPlan.getRoots()) {
+ if (containsOperatorsInternal(opRef.getValue(), interestedOperatorTags)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Determine whether a subplan could be rewritten as a join-related special case.
+ * The conditions include:
+ * a. there is a join (let's call it J1.) in the nested plan,
+ * b. if J1 is an inner join, one input pipeline of J1 has a NestedTupleSource descendant (let's call it N1),
+ * c. if J1 is a left outer join, the left branch of J1 has a NestedTupleSource descendant (let's call it N1),
+ * d. there is no tuple dropping from N1 to J1.
+ *
+ * @param subplanOp,
+ * the SubplanOperator to consider
+ * @return TRUE if the rewriting is applicable; FALSE otherwise.
+ * @throws AlgebricksException
+ */
+ private static Pair<Boolean, ILogicalOperator> isQualifiedForSpecialFlattening(SubplanOperator subplanOp)
+ throws AlgebricksException {
+ if (!OperatorManipulationUtil.ancestorOfOperators(
+ subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue(),
+ // we don't need to check recursively for this special rewriting.
+ ImmutableSet.of(LogicalOperatorTag.INNERJOIN, LogicalOperatorTag.LEFTOUTERJOIN))) {
+ return new Pair<Boolean, ILogicalOperator>(false, null);
+ }
+ SubplanSpecialFlatteningCheckVisitor visitor = new SubplanSpecialFlatteningCheckVisitor();
+ for (ILogicalPlan plan : subplanOp.getNestedPlans()) {
+ for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
+ if (!opRef.getValue().accept(visitor, null)) {
+ return new Pair<Boolean, ILogicalOperator>(false, null);
+ }
+ }
+ }
+ return new Pair<Boolean, ILogicalOperator>(true, visitor.getQualifiedNts());
+ }
+
+ /**
+ * Whether the query plan rooted {@code currentOp} contains a data source scan operator,
+ * with considering nested subplans.
+ *
+ * @param currentOp
+ * the current operator
+ * @return true if {@code currentOp} contains a data source scan operator; false otherwise.
+ */
+ private static boolean containsOperatorsInternal(ILogicalOperator currentOp,
+ Set<LogicalOperatorTag> interestedOperatorTags) {
+ if (interestedOperatorTags.contains(currentOp.getOperatorTag())) {
+ return true;
+ }
+ if (currentOp.getOperatorTag() == LogicalOperatorTag.SUBPLAN) {
+ if (containsOperators((SubplanOperator) currentOp, interestedOperatorTags)) {
+ return true;
+ }
+ }
+ for (Mutable<ILogicalOperator> childRef : currentOp.getInputs()) {
+ if (containsOperatorsInternal(childRef.getValue(), interestedOperatorTags)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
new file mode 100644
index 0000000..1a2e8bb
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.subplan;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
+
+/**
+ * This visitor determines whether a nested subplan in a SubplanOperator could be rewritten as
+ * a join-related special case.
+ * The conditions include:
+ * a. there is a join (let's call it J1.) in the nested plan,
+ * b. if J1 is an inner join, one input pipeline of J1 has a NestedTupleSource descendant (let's call it N1),
+ * c. if J1 is a left outer join, the left branch of J1 has a NestedTupleSource descendant (let's call it N1),
+ * d. there is no tuple dropping from N1 to J1.
+ */
+class SubplanSpecialFlatteningCheckVisitor implements IQueryOperatorVisitor<Boolean, Void> {
+ // Accept with an assumption that there doesn't exist an ancestor operator like J1 of the visiting
+ // tuple discarding or cardinality reducing operator.
+ // That is, a tuple discarding or cardinality operator like SelectOperator could not be
+ // on the path from N1 to J1, but could be on top of J1.
+ // For instance, during the post order visiting, if we hit a SelectOperator, we set rejectPending
+ // bit to be true, and later if we backtrack to J1 which is an ancestor of N1, we think the path
+ // from N1 to J1 is not valid because condition d is not met.
+ // Then we reset rejectPending to false and traverse another child of J1.
+ private boolean rejectPending = false;
+
+ // The qualified NTS to be replaced.
+ private ILogicalOperator qualifiedNtsOp;
+
+ public ILogicalOperator getQualifiedNts() {
+ return qualifiedNtsOp;
+ }
+
+ @Override
+ public Boolean visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+ return visitCardinalityReduceOperator(op);
+ }
+
+ @Override
+ public Boolean visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) throws AlgebricksException {
+ return visitCardinalityReduceOperator(op);
+ }
+
+ @Override
+ public Boolean visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+ return visitCardinalityReduceOperator(op);
+ }
+
+ @Override
+ public Boolean visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+ return visitTupleDiscardingOperator(op);
+ }
+
+ @Override
+ public Boolean visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+ for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+ if (childRef.getValue().accept(this, null) && !rejectPending) {
+ return true;
+ }
+ rejectPending = false;
+ }
+ return false;
+ }
+
+ @Override
+ public Boolean visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+ // Check whether the left branch is a qualified branch.
+ boolean isLeftBranchQualified = op.getInputs().get(0).getValue().accept(this, null);
+ return !rejectPending && isLeftBranchQualified;
+ }
+
+ @Override
+ public Boolean visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) throws AlgebricksException {
+ qualifiedNtsOp = op;
+ return true;
+ }
+
+ @Override
+ public Boolean visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+ return visitTupleDiscardingOperator(op);
+ }
+
+ @Override
+ public Boolean visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+ // Flattening with an Union Operator in the pipeline will perturb the query semantics,
+ // e.g., the result cardinality can change.
+ return false;
+ }
+
+ @Override
+ public Boolean visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitExternalDataLookupOperator(ExternalDataLookupOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ @Override
+ public Boolean visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+ return visitInputs(op);
+ }
+
+ private boolean visitInputs(ILogicalOperator op) throws AlgebricksException {
+ for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+ if (childRef.getValue().accept(this, null)) {
+ // One input is qualified.
+ return true;
+ }
+ }
+ // All inputs are disqualified.
+ return false;
+ }
+
+ /**
+ * If an operator reduces its input cardinality, the operator should not be a descendant
+ * of a join operator.
+ *
+ * @param op,
+ * the operator to consider
+ * @return FALSE if it is certainly disqualified; TRUE otherwise.
+ * @throws AlgebricksException
+ */
+ private Boolean visitCardinalityReduceOperator(ILogicalOperator op) throws AlgebricksException {
+ return visitTupleDiscardingOrCardinalityReduceOperator(op);
+ }
+
+ /**
+ * If an operator discard tuples variables before the join, the query's
+ * semantics cannot be preserved after applying the <code>FlatternSubplanJoinRule</code> rule.
+ *
+ * @param op,
+ * the operator to consider
+ * @return FALSE if it is certainly disqualified; TRUE otherwise.
+ * @throws AlgebricksException
+ */
+ private Boolean visitTupleDiscardingOperator(ILogicalOperator op) throws AlgebricksException {
+ return visitTupleDiscardingOrCardinalityReduceOperator(op);
+ }
+
+ private boolean visitTupleDiscardingOrCardinalityReduceOperator(ILogicalOperator op) throws AlgebricksException {
+ boolean result = visitInputs(op);
+ rejectPending = true;
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
index 7b29ba3..9f98da0 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
@@ -20,8 +20,10 @@
package org.apache.asterix.optimizer.rules.util;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.lang.common.util.FunctionUtil;
@@ -31,6 +33,8 @@ import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
@@ -39,8 +43,13 @@ import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
import org.mortbay.util.SingletonList;
public class EquivalenceClassUtils {
@@ -99,4 +108,51 @@ public class EquivalenceClassUtils {
}
}
+ /**
+ * Find the header variables that can imply all subplan-local live variables at <code>operator</code>.
+ *
+ * @param context
+ * the optimization context.
+ * @param operator
+ * the operator of interest.
+ * @return a set of covering variables that can imply all subplan-local live variables at <code>operator</code>.
+ * @throws AlgebricksException
+ */
+ public static Set<LogicalVariable> findFDHeaderVariables(IOptimizationContext context, ILogicalOperator operator)
+ throws AlgebricksException {
+ PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses((AbstractLogicalOperator) operator, context);
+ List<FunctionalDependency> fds = context.getFDList(operator);
+ context.clearAllFDAndEquivalenceClasses();
+
+ Set<LogicalVariable> liveVars = new HashSet<>();
+ VariableUtilities.getSubplanLocalLiveVariables(operator, liveVars);
+
+ Set<LogicalVariable> key = new HashSet<>();
+ Set<LogicalVariable> cover = new HashSet<>();
+ for (FunctionalDependency fd : fds) {
+ List<LogicalVariable> head = fd.getHead();
+ head.retainAll(liveVars);
+ key.addAll(head);
+ cover.addAll(fd.getTail());
+ if (cover.containsAll(liveVars)) {
+ return key;
+ }
+ }
+ if (cover.containsAll(liveVars)) {
+ return key;
+ } else {
+ IVariableTypeEnvironment env = context.getOutputTypeEnvironment(operator);
+ Set<LogicalVariable> keyVars = new HashSet<>();
+ for (LogicalVariable var : liveVars) {
+ IAType type = (IAType) env.getVarType(var);
+ ATypeTag typeTag = type.getTypeTag();
+ if (typeTag == ATypeTag.RECORD || typeTag == ATypeTag.ORDEREDLIST
+ || typeTag == ATypeTag.UNORDEREDLIST) {
+ continue;
+ }
+ keyVars.add(var);
+ }
+ return keyVars;
+ }
+ }
}