You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2016/01/28 05:46:46 UTC
[7/7] incubator-asterixdb git commit: ASTERIXDB-1005,
ASTERIXDB-1263: Clean up subplan flattening: 1. Inline
NestedTupleSource and remove SubplanOperator for special cases that join
operators inside the SubplanOperator can be re-targeted for correl
ASTERIXDB-1005, ASTERIXDB-1263: Clean up subplan flattening:
1. Inline NestedTupleSource and remove SubplanOperator for special cases that
join operators inside the SubplanOperator can be re-targeted for
correlations;
2. Blindly inline NestedTupleSource and remove SubplanOperator for general cases
where the condition of the special case is not met.
Change-Id: I4dd130a25f3c81272cc23f844ea20e376e990612
Reviewed-on: https://asterix-gerrit.ics.uci.edu/579
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/947fc3cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/947fc3cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/947fc3cb
Branch: refs/heads/master
Commit: 947fc3cb209a6e7e37b435ea35f0d856f3c35dc7
Parents: bf34d18
Author: Yingyi Bu <yi...@couchbase.com>
Authored: Wed Jan 27 20:14:47 2016 -0800
Committer: Yingyi Bu <bu...@gmail.com>
Committed: Wed Jan 27 20:42:09 2016 -0800
----------------------------------------------------------------------
.../asterix/optimizer/base/RuleCollections.java | 4 -
.../CancelUnnestWithNestedListifyRule.java | 8 +
.../subplan/InlineAllNtsInSubplanVisitor.java | 705 +++++++++++++++++++
...neLeftNtsInSubplanJoinFlatteningVisitor.java | 413 +++++++++++
...ineSubplanInputForNestedTupleSourceRule.java | 698 ++++++++----------
.../rules/subplan/SubplanFlatteningUtil.java | 203 ++++++
.../SubplanSpecialFlatteningCheckVisitor.java | 269 +++++++
.../rules/util/EquivalenceClassUtils.java | 56 ++
asterix-app/data/restaurants/restaurants.adm | 30 +
.../optimizerts/queries/nested_loj2.aql | 2 +-
.../optimizerts/queries/nested_loj3.aql | 2 +-
.../optimizerts/queries/nested_loj4.aql | 79 +++
.../queries/query-ASTERIXDB-1005.aql | 59 ++
.../queries/query-ASTERIXDB-1263.aql | 53 ++
...join-probe-pidx-with-join-btree-sidx_01.plan | 63 +-
...join-probe-pidx-with-join-btree-sidx_02.plan | 63 +-
.../optimizerts/results/fj-phase1.plan | 52 +-
.../results/inverted-index-join/issue741.plan | 79 +--
...dx-with-join-edit-distance-check-idx_01.plan | 139 ++--
...obe-pidx-with-join-jaccard-check-idx_01.plan | 87 ++-
.../optimizerts/results/loj-super-key_01.plan | 10 +-
.../optimizerts/results/loj-super-key_02.plan | 8 +-
.../optimizerts/results/nest_aggregate.plan | 109 ++-
...join-probe-pidx-with-join-btree-sidx_01.plan | 77 +-
...join-probe-pidx-with-join-btree-sidx_02.plan | 77 +-
...dx-with-join-edit-distance-check-idx_01.plan | 171 +++--
...obe-pidx-with-join-jaccard-check-idx_01.plan | 105 ++-
...join-probe-pidx-with-join-rtree-sidx_01.plan | 14 +-
...join-probe-pidx-with-join-rtree-sidx_02.plan | 14 +-
...in-probe-pidx-with-join-btree-sidx_01_1.plan | 77 +-
...in-probe-pidx-with-join-btree-sidx_01_2.plan | 77 +-
...in-probe-pidx-with-join-btree-sidx_02_1.plan | 77 +-
...in-probe-pidx-with-join-btree-sidx_02_2.plan | 77 +-
...dx-with-join-edit-distance-check-idx_01.plan | 171 +++--
...join-probe-pidx-with-join-rtree-sidx_01.plan | 14 +-
...join-probe-pidx-with-join-rtree-sidx_02.plan | 14 +-
.../optimizerts/results/nested_loj2.plan | 26 +-
.../optimizerts/results/nested_loj3.plan | 36 +-
.../optimizerts/results/nested_loj4.plan | 39 +
...in-probe-pidx-with-join-btree-sidx_01_1.plan | 63 +-
...in-probe-pidx-with-join-btree-sidx_01_2.plan | 63 +-
...in-probe-pidx-with-join-btree-sidx_02_1.plan | 63 +-
...in-probe-pidx-with-join-btree-sidx_02_2.plan | 63 +-
...dx-with-join-edit-distance-check-idx_01.plan | 139 ++--
...join-probe-pidx-with-join-rtree-sidx_01.plan | 67 +-
...join-probe-pidx-with-join-rtree-sidx_02.plan | 67 +-
.../results/push-project-through-group.plan | 49 +-
.../optimizerts/results/q08_group_by.plan | 20 +-
.../results/query-ASTERIXDB-1005.plan | 35 +
.../results/query-ASTERIXDB-1263.plan | 35 +
.../optimizerts/results/query-issue562.plan | 79 ++-
.../optimizerts/results/query_issue849-2.plan | 9 +-
.../optimizerts/results/query_issue849.plan | 4 +-
.../results/rtree-index-join/issue730.plan | 65 +-
...join-probe-pidx-with-join-rtree-sidx_01.plan | 67 +-
...join-probe-pidx-with-join-rtree-sidx_02.plan | 67 +-
.../split-materialization-above-join.plan | 211 +++---
.../results/udfs/query-ASTERIXDB-1017-2.plan | 44 +-
.../udfs/query-ASTERIXDB-1017-recursive-2.plan | 44 +-
.../udfs/query-ASTERIXDB-1017-recursive.plan | 56 +-
.../results/udfs/query-ASTERIXDB-1017.plan | 69 +-
.../results/udfs/query-ASTERIXDB-1018.plan | 119 ++--
.../results/udfs/query-ASTERIXDB-1019.plan | 119 ++--
.../results/udfs/query-ASTERIXDB-1020.plan | 83 ++-
.../results/udfs/query-ASTERIXDB-1029.plan | 85 +--
.../results/udfs/query-ASTERIXDB-1029_2.plan | 85 +--
.../query-ASTERIXDB-1063.23.query.aql | 29 +
.../nest_aggregate2/nest_aggregate2.1.ddl.aql | 88 +++
.../nest_aggregate2.2.update.aql | 50 ++
.../nest_aggregate2/nest_aggregate2.3.query.aql | 45 ++
.../query-ASTERIXDB-1005.1.ddl.sqlpp | 53 ++
.../query-ASTERIXDB-1005.2.update.sqlpp | 31 +
.../query-ASTERIXDB-1005.3.query.sqlpp | 31 +
.../query-ASTERIXDB-1063.23.query.sqlpp | 30 +
.../nest_aggregate2/nest_aggregate2.1.ddl.sqlpp | 94 +++
.../nest_aggregate2.2.update.sqlpp | 40 ++
.../nest_aggregate2.3.query.sqlpp | 46 ++
.../query-ASTERIXDB-1005.1.adm | 15 +
.../overlap_bins_gby_3/overlap_bins_gby_3.1.adm | 84 +--
.../tinysocial-suite/tinysocial-suite.23.adm | 1 +
.../tpch/nest_aggregate2/nest_aggregate2.1.adm | 11 +
.../query-ASTERIXDB-1263.23.ast | 61 ++
.../src/test/resources/runtimets/testsuite.xml | 5 +
.../resources/runtimets/testsuite_sqlpp.xml | 10 +
.../metadata/feeds/FeedMetadataUtil.java | 2 +-
85 files changed, 4682 insertions(+), 2171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 260fb3c..4187059 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -118,8 +118,6 @@ import org.apache.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
import org.apache.hyracks.algebricks.rewriter.rules.SimpleUnnestToProductRule;
import org.apache.hyracks.algebricks.rewriter.rules.subplan.EliminateSubplanRule;
import org.apache.hyracks.algebricks.rewriter.rules.subplan.EliminateSubplanWithInputCardinalityOneRule;
-import org.apache.hyracks.algebricks.rewriter.rules.subplan.IntroduceGroupByForSubplanRule;
-import org.apache.hyracks.algebricks.rewriter.rules.subplan.IntroduceLeftOuterJoinForSubplanRule;
import org.apache.hyracks.algebricks.rewriter.rules.subplan.NestedSubplanToJoinRule;
import org.apache.hyracks.algebricks.rewriter.rules.subplan.PushSubplanIntoGroupByRule;
import org.apache.hyracks.algebricks.rewriter.rules.subplan.SubplanOutOfGroupRule;
@@ -190,9 +188,7 @@ public final class RuleCollections {
condPushDownAndJoinInference.add(new IntroJoinInsideSubplanRule());
condPushDownAndJoinInference.add(new PushMapOperatorDownThroughProductRule());
condPushDownAndJoinInference.add(new PushSubplanWithAggregateDownThroughProductRule());
- condPushDownAndJoinInference.add(new IntroduceGroupByForSubplanRule());
condPushDownAndJoinInference.add(new SubplanOutOfGroupRule());
- condPushDownAndJoinInference.add(new IntroduceLeftOuterJoinForSubplanRule());
condPushDownAndJoinInference.add(new AsterixExtractFunctionsFromJoinConditionRule());
condPushDownAndJoinInference.add(new RemoveRedundantVariablesRule());
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
index 558fa88..2de6bc4 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
@@ -49,8 +49,11 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperat
import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
import org.apache.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import com.google.common.collect.ImmutableSet;
+
/**
* This rule cancels the unnest with the nested listify. Formally, the following plan<br/>
*
@@ -195,6 +198,11 @@ public class CancelUnnestWithNestedListifyRule implements IAlgebraicRewriteRule
return false;
}
+ if (OperatorManipulationUtil.ancestorOfOperators(agg, ImmutableSet.of(LogicalOperatorTag.LIMIT,
+ LogicalOperatorTag.ORDER, LogicalOperatorTag.GROUP, LogicalOperatorTag.DISTINCT))) {
+ return false;
+ }
+
LogicalVariable aggVar = agg.getVariables().get(0);
ILogicalExpression aggFun = agg.getExpressions().get(0).getValue();
if (!aggVar.equals(unnestedVar)
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
new file mode 100644
index 0000000..3489982
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.subplan;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
+
+/*
+ This visitor inlines all nested tuple source operators in the query
+ plan rooted at the operator being visited, with a deep copy of the query
+ plan rooted at the input <code>subplanInputOperator</code>.
+
+ The visitor ensures that the variables used to correlate between the
+ query plan rooted at <code>subplanInputOperator</code> are propagated
+ to the operator being visited.
+
+ ----------------------------------
+ Here is an abstract example.
+ The original query plan:
+ --Op1
+ --Subplan{
+ --AggregateOp
+ --NestedOp
+ .....
+ --Nested-Tuple-Source
+ }
+ --InputOp
+ .....
+
+ After we call NestedOp.accept(....) with this visitor. We will get an
+ intermediate plan that looks like:
+ --Op1
+ --Subplan{
+ --AggregateOp
+ --NestedOp
+ .....
+ --InputOp'
+ ....
+ }
+ --InputOp
+ .....
+ The plan rooted at InputOp' is a deep copy of the plan rooted at InputOp
+ with a different set of variables.
+
+ */
+class InlineAllNtsInSubplanVisitor implements IQueryOperatorVisitor<ILogicalOperator, Void> {
+ // The optimization context.
+ private final IOptimizationContext context;
+
+ // The target SubplanOperator.
+ private final ILogicalOperator subplanOperator;
+
+ // The input operator to the subplan.
+ private final ILogicalOperator subplanInputOperator;
+
+ // Maps live variables at <code>subplanInputOperator</code> to variables in the flattened nested plan.
+ private final Map<LogicalVariable, LogicalVariable> subplanInputVarToCurrentVarMap = new HashMap<>();
+
+ // Maps variables in the flattened nested plan to live variables at <code>subplannputOperator</code>.
+ private final Map<LogicalVariable, LogicalVariable> currentVarToSubplanInputVarMap = new HashMap<>();
+
+ // The set of key variables at the current operator that is being visited.
+ private final Set<LogicalVariable> correlatedKeyVars = new HashSet<>();
+
+ // The list of variables determining the ordering.
+ private final List<Pair<IOrder, Mutable<ILogicalExpression>>> orderingExprs = new ArrayList<>();
+
+ // Maps variables in the flattened nested plan to live variables at <code>subplannputOperator</code>.
+ private final List<Pair<LogicalVariable, LogicalVariable>> varMapIntroducedByRewriting = new ArrayList<>();
+
+ /**
+ * @param context
+ * the optimization context
+ * @param subplanInputOperator
+ * the input operator to the target subplan operator, which is to be inlined.
+ * @throws AlgebricksException
+ */
+ public InlineAllNtsInSubplanVisitor(IOptimizationContext context, ILogicalOperator subplanOperator)
+ throws AlgebricksException {
+ this.context = context;
+ this.subplanOperator = subplanOperator;
+ this.subplanInputOperator = subplanOperator.getInputs().get(0).getValue();
+ }
+
+ public Map<LogicalVariable, LogicalVariable> getInputVariableToOutputVariableMap() {
+ return subplanInputVarToCurrentVarMap;
+ }
+
+ public List<Pair<LogicalVariable, LogicalVariable>> getVariableMapHistory() {
+ return varMapIntroducedByRewriting;
+ }
+
+ public List<Pair<IOrder, Mutable<ILogicalExpression>>> getOrderingExpressions() {
+ return orderingExprs;
+ }
+
+ @Override
+ public ILogicalOperator visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+ return visitAggregateOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Void arg)
+ throws AlgebricksException {
+ return visitAggregateOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg)
+ throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+ visitSingleInputOperator(op);
+ Set<LogicalVariable> groupKeyVars = new HashSet<>();
+ // Maps group by key variables if the corresponding expressions are VariableReferenceExpressions.
+ for (Pair<LogicalVariable, Mutable<ILogicalExpression>> keyVarExprRef : op.getGroupByList()) {
+ ILogicalExpression expr = keyVarExprRef.second.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
+ LogicalVariable sourceVar = varExpr.getVariableReference();
+ updateInputToOutputVarMapping(sourceVar, keyVarExprRef.first, false);
+ groupKeyVars.add(keyVarExprRef.first);
+ }
+ }
+
+ // Add correlated key variables into group-by keys.
+ Map<LogicalVariable, LogicalVariable> addedGroupKeyMapping = new HashMap<>();
+ for (LogicalVariable keyVar : correlatedKeyVars) {
+ if (!groupKeyVars.contains(keyVar)) {
+ LogicalVariable newVar = context.newVar();
+ op.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyVar))));
+ addedGroupKeyMapping.put(keyVar, newVar);
+ }
+ }
+
+ // Updates decor list.
+ Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorExprIter = op.getDecorList().iterator();
+ while (decorExprIter.hasNext()) {
+ ILogicalExpression expr = decorExprIter.next().second.getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
+ if (correlatedKeyVars.contains(varExpr.getVariableReference())) {
+ decorExprIter.remove();
+ }
+ }
+ }
+
+ // Updates the var mapping for added group-by keys.
+ for (Map.Entry<LogicalVariable, LogicalVariable> entry : addedGroupKeyMapping.entrySet()) {
+ updateInputToOutputVarMapping(entry.getKey(), entry.getValue(), false);
+ }
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+ // Processes its input operator.
+ visitSingleInputOperator(op);
+ if (correlatedKeyVars.isEmpty()) {
+ return op;
+ }
+
+ // Get live variables before limit.
+ Set<LogicalVariable> inputLiveVars = new HashSet<LogicalVariable>();
+ VariableUtilities.getSubplanLocalLiveVariables(op.getInputs().get(0).getValue(), inputLiveVars);
+
+ // Creates a record construction assign operator.
+ Pair<ILogicalOperator, LogicalVariable> assignOpAndRecordVar = createRecordConstructorAssignOp(inputLiveVars);
+ ILogicalOperator assignOp = assignOpAndRecordVar.first;
+ LogicalVariable recordVar = assignOpAndRecordVar.second;
+ ILogicalOperator inputOp = op.getInputs().get(0).getValue();
+ assignOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
+
+ // Rewrites limit to a group-by with limit as its nested operator.
+ Pair<ILogicalOperator, LogicalVariable> gbyOpAndAggVar = wrapLimitInGroupBy(op, recordVar, inputLiveVars);
+ ILogicalOperator gbyOp = gbyOpAndAggVar.first;
+ LogicalVariable aggVar = gbyOpAndAggVar.second;
+ gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(assignOp));
+
+ // Adds an unnest operators on top of the group-by operator.
+ Pair<ILogicalOperator, LogicalVariable> unnestOpAndUnnestVar = createUnnestForAggregatedList(aggVar);
+ ILogicalOperator unnestOp = unnestOpAndUnnestVar.first;
+ LogicalVariable unnestVar = unnestOpAndUnnestVar.second;
+ unnestOp.getInputs().add(new MutableObject<ILogicalOperator>(gbyOp));
+
+ // Adds field accesses to recover input live variables.
+ ILogicalOperator fieldAccessAssignOp = createFieldAccessAssignOperator(unnestVar, inputLiveVars);
+ fieldAccessAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(unnestOp));
+
+ OperatorManipulationUtil.computeTypeEnvironmentBottomUp(fieldAccessAssignOp, context);
+ return fieldAccessAssignOp;
+ }
+
+ private Pair<ILogicalOperator, LogicalVariable> createRecordConstructorAssignOp(
+ Set<LogicalVariable> inputLiveVars) {
+ // Creates a nested record.
+ List<Mutable<ILogicalExpression>> recordConstructorArgs = new ArrayList<>();
+ for (LogicalVariable inputLiveVar : inputLiveVars) {
+ if (!correlatedKeyVars.contains(inputLiveVar)) {
+ recordConstructorArgs.add(new MutableObject<ILogicalExpression>(new ConstantExpression(
+ new AsterixConstantValue(new AString(Integer.toString(inputLiveVar.getId()))))));
+ recordConstructorArgs
+ .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(inputLiveVar)));
+ }
+ }
+ LogicalVariable recordVar = context.newVar();
+ Mutable<ILogicalExpression> recordExprRef = new MutableObject<ILogicalExpression>(
+ new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR),
+ recordConstructorArgs));
+ AssignOperator assignOp = new AssignOperator(recordVar, recordExprRef);
+ return new Pair<ILogicalOperator, LogicalVariable>(assignOp, recordVar);
+ }
+
+ private Pair<ILogicalOperator, LogicalVariable> wrapLimitInGroupBy(ILogicalOperator op, LogicalVariable recordVar,
+ Set<LogicalVariable> inputLiveVars) throws AlgebricksException {
+ GroupByOperator gbyOp = new GroupByOperator();
+ List<Pair<LogicalVariable, LogicalVariable>> keyVarNewVarPairs = new ArrayList<>();
+ for (LogicalVariable keyVar : correlatedKeyVars) {
+ // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
+ // where the keyVarsToEnforce forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
+ LogicalVariable newVar = context.newVar();
+ gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyVar))));
+ keyVarNewVarPairs.add(new Pair<LogicalVariable, LogicalVariable>(keyVar, newVar));
+ }
+
+ // Creates an aggregate operator doing LISTIFY, as the root of the nested plan of the added group-by operator.
+ List<LogicalVariable> aggVarList = new ArrayList<LogicalVariable>();
+ List<Mutable<ILogicalExpression>> aggExprList = new ArrayList<Mutable<ILogicalExpression>>();
+ LogicalVariable aggVar = context.newVar();
+ List<Mutable<ILogicalExpression>> aggArgList = new ArrayList<>();
+ aggVarList.add(aggVar);
+ // Creates an aggregation function expression.
+ aggArgList.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(recordVar)));
+ ILogicalExpression aggExpr = new AggregateFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.LISTIFY), false, aggArgList);
+ aggExprList.add(new MutableObject<ILogicalExpression>(aggExpr));
+ AggregateOperator aggOp = new AggregateOperator(aggVarList, aggExprList);
+
+ // Adds the original limit operator as the input operator to the added aggregate operator.
+ aggOp.getInputs().add(new MutableObject<ILogicalOperator>(op));
+ op.getInputs().clear();
+ ILogicalOperator currentOp = op;
+ if (!orderingExprs.isEmpty()) {
+ OrderOperator orderOp = new OrderOperator(cloneOrderingExpression(orderingExprs));
+ op.getInputs().add(new MutableObject<ILogicalOperator>(orderOp));
+ currentOp = orderOp;
+ }
+
+ // Adds a nested tuple source operator as the input operator to the limit operator.
+ NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(gbyOp));
+ currentOp.getInputs().add(new MutableObject<ILogicalOperator>(nts));
+
+ // Sets the root of the added nested plan to the aggregate operator.
+ ILogicalPlan nestedPlan = new ALogicalPlanImpl();
+ nestedPlan.getRoots().add(new MutableObject<ILogicalOperator>(aggOp));
+
+ // Sets the nested plan for the added group-by operator.
+ gbyOp.getNestedPlans().add(nestedPlan);
+
+ // Updates variable mapping for ancestor operators.
+ for (Pair<LogicalVariable, LogicalVariable> keyVarNewVar : keyVarNewVarPairs) {
+ updateInputToOutputVarMapping(keyVarNewVar.first, keyVarNewVar.second, false);
+ }
+ return new Pair<ILogicalOperator, LogicalVariable>(gbyOp, aggVar);
+ }
+
+ private Pair<ILogicalOperator, LogicalVariable> createUnnestForAggregatedList(LogicalVariable aggVar) {
+ LogicalVariable unnestVar = context.newVar();
+ // Creates an unnest function expression.
+ Mutable<ILogicalExpression> unnestArg = new MutableObject<ILogicalExpression>(
+ new VariableReferenceExpression(aggVar));
+ List<Mutable<ILogicalExpression>> unnestArgList = new ArrayList<Mutable<ILogicalExpression>>();
+ unnestArgList.add(unnestArg);
+ Mutable<ILogicalExpression> unnestExpr = new MutableObject<ILogicalExpression>(
+ new UnnestingFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), unnestArgList));
+ ILogicalOperator unnestOp = new UnnestOperator(unnestVar, unnestExpr);
+ return new Pair<ILogicalOperator, LogicalVariable>(unnestOp, unnestVar);
+ }
+
+ private ILogicalOperator createFieldAccessAssignOperator(LogicalVariable recordVar,
+ Set<LogicalVariable> inputLiveVars) {
+ List<LogicalVariable> fieldAccessVars = new ArrayList<>();
+ List<Mutable<ILogicalExpression>> fieldAccessExprs = new ArrayList<>();
+ //Adds field access by name.
+ for (LogicalVariable inputLiveVar : inputLiveVars) {
+ if (!correlatedKeyVars.contains(inputLiveVar)) {
+ // field Var
+ LogicalVariable newVar = context.newVar();
+ fieldAccessVars.add(newVar);
+ // fieldAcess expr
+ List<Mutable<ILogicalExpression>> argRefs = new ArrayList<>();
+ argRefs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(recordVar)));
+ argRefs.add(new MutableObject<ILogicalExpression>(new ConstantExpression(
+ new AsterixConstantValue(new AString(Integer.toString(inputLiveVar.getId()))))));
+ fieldAccessExprs.add(new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+ FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), argRefs)));
+ // Updates variable mapping for ancestor operators.
+ updateInputToOutputVarMapping(inputLiveVar, newVar, false);
+ }
+ }
+ AssignOperator fieldAccessAssignOp = new AssignOperator(fieldAccessVars, fieldAccessExprs);
+ return fieldAccessAssignOp;
+ }
+
+ @Override
+ public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+ return visitMultiInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+ return visitMultiInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
+ throws AlgebricksException {
+ if (op.getDataSourceReference().getValue() != subplanOperator) {
+ return op;
+ }
+ LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor(
+ context);
+ ILogicalOperator copiedInputOperator = deepCopyVisitor.deepCopy(subplanInputOperator, null);
+
+ // Updates the primary key info in the copied plan segment.
+ Map<LogicalVariable, LogicalVariable> varMap = deepCopyVisitor.getInputToOutputVariableMapping();
+ context.updatePrimaryKeys(varMap);
+
+ correlatedKeyVars.clear();
+ correlatedKeyVars.addAll(EquivalenceClassUtils.findFDHeaderVariables(context, subplanInputOperator));
+ // Update key variables and input-output-var mapping.
+ for (Map.Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
+ LogicalVariable oldVar = entry.getKey();
+ LogicalVariable newVar = entry.getValue();
+ if (correlatedKeyVars.contains(oldVar)) {
+ correlatedKeyVars.remove(oldVar);
+ correlatedKeyVars.add(newVar);
+ }
+ updateInputToOutputVarMapping(oldVar, newVar, true);
+ }
+ return copiedInputOperator;
+ }
+
+ @Override
+ public ILogicalOperator visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+ visitSingleInputOperator(op);
+ if (correlatedKeyVars.isEmpty()) {
+ return op;
+ }
+
+ orderingExprs.clear();
+ orderingExprs.addAll(cloneOrderingExpression(op.getOrderExpressions()));
+
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprList = new ArrayList<>();
+ // Adds keyVars to the prefix of sorting columns.
+ for (LogicalVariable keyVar : correlatedKeyVars) {
+ orderExprList.add(new Pair<IOrder, Mutable<ILogicalExpression>>(OrderOperator.ASC_ORDER,
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyVar))));
+ }
+ orderExprList.addAll(op.getOrderExpressions());
+
+ // Creates an order operator with the new expression list.
+ OrderOperator orderOp = new OrderOperator(orderExprList);
+ orderOp.getInputs().addAll(op.getInputs());
+ context.computeAndSetTypeEnvironmentForOperator(orderOp);
+ return orderOp;
+ }
+
+ @Override
+ public ILogicalOperator visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+ visitSingleInputOperator(op);
+ List<Mutable<ILogicalExpression>> assignedExprRefs = op.getExpressions();
+ List<LogicalVariable> assignedVars = op.getVariables();
+ // Maps assigning variables if assignment expressions are VariableReferenceExpressions.
+ for (int index = 0; index < assignedVars.size(); ++index) {
+ ILogicalExpression expr = assignedExprRefs.get(index).getValue();
+ if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+ VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
+ LogicalVariable sourceVar = varExpr.getVariableReference();
+ updateInputToOutputVarMapping(sourceVar, assignedVars.get(index), false);
+ }
+ }
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+ visitSingleInputOperator(op);
+ // Adds all missing variables that should propagates up.
+ for (LogicalVariable keyVar : correlatedKeyVars) {
+ if (!op.getVariables().contains(keyVar)) {
+ op.getVariables().add(keyVar);
+ }
+ }
+ return op;
+
+ }
+
+ @Override
+ public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
+ throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+ throw new UnsupportedOperationException("Script operators in a subplan are not supported!");
+ }
+
+ @Override
+ public ILogicalOperator visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+ visitMultiInputOperator(op);
+ // Update the variable mappings
+ List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varTriples = op.getVariableMappings();
+ for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varTriples) {
+ updateInputToOutputVarMapping(triple.third, triple.first, false);
+ updateInputToOutputVarMapping(triple.second, triple.first, false);
+ }
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+ visitSingleInputOperator(op);
+ Set<LogicalVariable> liveVars = new HashSet<>();
+ VariableUtilities.getLiveVariables(op, liveVars);
+ if (!liveVars.containsAll(correlatedKeyVars)) {
+ op.setPropagatesInput(true);
+ }
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+ visitSingleInputOperator(op);
+ List<LogicalVariable> distinctVarList = op.getDistinctByVarList();
+ for (LogicalVariable keyVar : correlatedKeyVars) {
+ if (!distinctVarList.contains(keyVar)) {
+ distinctVarList.add(keyVar);
+ }
+ }
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op, Void arg)
+ throws AlgebricksException {
+ visitSingleInputOperator(op);
+ Set<LogicalVariable> liveVars = new HashSet<>();
+ VariableUtilities.getLiveVariables(op, liveVars);
+ if (!liveVars.containsAll(correlatedKeyVars)) {
+ op.setPropagateInput(true);
+ }
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ /**
+ * Wraps an AggregateOperator or RunningAggregateOperator with a group-by operator where
+ * the group-by keys are variables in keyVarsToEnforce.
+ * Note that the function here prevents this visitor being used to rewrite arbitrary query plans.
+ * Instead, it could only be used for rewriting a nested plan within a subplan operator.
+ *
+ * @param op
+ * the logical operator for aggregate or running aggregate.
+ * @param keyVarsToEnforce
+ * the set of variables that needs to preserve.
+ * @return the wrapped group-by operator if {@code keyVarsToEnforce} is not empty, and {@code op} otherwise.
+ * @throws AlgebricksException
+ */
+ private ILogicalOperator visitAggregateOperator(ILogicalOperator op) throws AlgebricksException {
+ visitSingleInputOperator(op);
+ if (correlatedKeyVars.isEmpty()) {
+ return op;
+ }
+ GroupByOperator gbyOp = new GroupByOperator();
+ for (LogicalVariable keyVar : correlatedKeyVars) {
+ // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
+ // where the keyVarsToEnforce forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
+ LogicalVariable newVar = context.newVar();
+ gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyVar))));
+ updateInputToOutputVarMapping(keyVar, newVar, false);
+ }
+
+ ILogicalOperator inputOp = op.getInputs().get(0).getValue();
+ gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
+
+ NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(gbyOp));
+ op.getInputs().clear();
+ op.getInputs().add(new MutableObject<ILogicalOperator>(nts));
+
+ ILogicalPlan nestedPlan = new ALogicalPlanImpl();
+ nestedPlan.getRoots().add(new MutableObject<ILogicalOperator>(op));
+ gbyOp.getNestedPlans().add(nestedPlan);
+
+ OperatorManipulationUtil.computeTypeEnvironmentBottomUp(gbyOp, context);
+ return op;
+ }
+
+ private ILogicalOperator visitMultiInputOperator(ILogicalOperator op) throws AlgebricksException {
+ orderingExprs.clear();
+ Set<LogicalVariable> keyVarsForCurrentBranch = new HashSet<LogicalVariable>();
+ for (int i = op.getInputs().size() - 1; i >= 0; --i) {
+ // Stores key variables for the previous branch.
+ keyVarsForCurrentBranch.addAll(correlatedKeyVars);
+ correlatedKeyVars.clear();
+
+ // Deals with single input operators.
+ ILogicalOperator newChild = op.getInputs().get(i).getValue().accept(this, null);
+ op.getInputs().get(i).setValue(newChild);
+
+ if (correlatedKeyVars.isEmpty()) {
+ correlatedKeyVars.addAll(keyVarsForCurrentBranch);
+ }
+ keyVarsForCurrentBranch.clear();
+ }
+ subtituteVariables(op);
+ return op;
+ }
+
+ private ILogicalOperator visitSingleInputOperator(ILogicalOperator op) throws AlgebricksException {
+ if (op.getInputs().size() == 1) {
+ // Deals with single input operators.
+ ILogicalOperator newChild = op.getInputs().get(0).getValue().accept(this, null);
+ op.getInputs().get(0).setValue(newChild);
+ }
+ subtituteVariables(op);
+ return op;
+ }
+
+ private void subtituteVariables(ILogicalOperator op) throws AlgebricksException {
+ VariableUtilities.substituteVariables(op, subplanInputVarToCurrentVarMap, context);
+ for (Pair<LogicalVariable, LogicalVariable> pair : varMapIntroducedByRewriting) {
+ VariableUtilities.substituteVariables(op, pair.first, pair.second, context);
+ }
+ }
+
+ private void updateInputToOutputVarMapping(LogicalVariable oldVar, LogicalVariable newVar, boolean inNts) {
+ if (correlatedKeyVars.contains(oldVar)) {
+ correlatedKeyVars.remove(oldVar);
+ correlatedKeyVars.add(newVar);
+ }
+
+ for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : orderingExprs) {
+ orderExpr.second.getValue().substituteVar(oldVar, newVar);
+ }
+
+ if (currentVarToSubplanInputVarMap.containsKey(oldVar)) {
+ // Find the original mapped var.
+ oldVar = currentVarToSubplanInputVarMap.get(oldVar);
+ }
+ if (subplanInputVarToCurrentVarMap.containsKey(oldVar) || inNts) {
+ subplanInputVarToCurrentVarMap.put(oldVar, newVar);
+ currentVarToSubplanInputVarMap.put(newVar, oldVar);
+ } else {
+ varMapIntroducedByRewriting.add(new Pair<LogicalVariable, LogicalVariable>(oldVar, newVar));
+ }
+ }
+
+ private List<Pair<IOrder, Mutable<ILogicalExpression>>> cloneOrderingExpression(
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs) {
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> clonedOrderExprs = new ArrayList<>();
+ for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : orderExprs) {
+ clonedOrderExprs.add(new Pair<IOrder, Mutable<ILogicalExpression>>(orderExpr.first,
+ new MutableObject<ILogicalExpression>(orderExpr.second.getValue().cloneExpression())));
+ }
+ return clonedOrderExprs;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
new file mode 100644
index 0000000..191a2f9
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.subplan;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
+
+/*
+ This visitor inlines the input <code>nts</code> in the query plan rooted
+ at the operator being visited, with the query plan rooted at the input
+ <code>subplanInputOperator</code>.
+
+ The visitor ensures that:
+ 1. live variables at <code>subplanInputOperator</code> are
+ propagated to the top-most join operator in the query plan rooted
+ at the operator being visited;
+ 2. no available tuple at <code>subplanInputOperator</code> get lost along the
+ pipeline to the top-most join operator in the query plan rooted
+ at the operator being visited.
+*/
+class InlineLeftNtsInSubplanJoinFlatteningVisitor implements IQueryOperatorVisitor<ILogicalOperator, Void> {
+ // The optimization context.
+ private final IOptimizationContext context;
+
+ // The input operator to the subplan.
+ private final ILogicalOperator subplanInputOperator;
+
+ // The target Nts operator.
+ private final ILogicalOperator targetNts;
+
+ // The live variables in <code>subplanInputOperator</code> to enforce.
+ private final Set<LogicalVariable> liveVarsFromSubplanInput = new HashSet<>();
+
+ // The state indicate if the operator tree rooted at the current operator is rewritten.
+ private boolean rewritten = false;
+
+ // The state indicate if the operator tree rooted at the current operator contains a rewritten join.
+ private boolean hasJoinAncestor = false;
+
+ // A set of variables that are needed for not-null checks in the final group-by operator.
+ private Set<LogicalVariable> nullCheckVars = new HashSet<LogicalVariable>();
+
+ // The top join reference.
+ private Mutable<ILogicalOperator> topJoinRef;
+
+ /***
+ * @param context
+ * the optimization context
+ * @param subplanInputOperator
+ * the input operator to the target SubplanOperator
+ * @param nts
+ * the NestedTupleSourceOperator to be replaced by <code>subplanInputOperator</code>
+ * @throws AlgebricksException
+ */
+ public InlineLeftNtsInSubplanJoinFlatteningVisitor(IOptimizationContext context,
+ ILogicalOperator subplanInputOperator, ILogicalOperator nts) throws AlgebricksException {
+ this.context = context;
+ this.subplanInputOperator = subplanInputOperator;
+ this.targetNts = nts;
+ VariableUtilities.getSubplanLocalLiveVariables(subplanInputOperator, liveVarsFromSubplanInput);
+ }
+
+ /**
+ * @return a set of variables indicating whether a tuple from the right
+ * branch of a left-outer join is a non-match.
+ */
+ public Set<LogicalVariable> getNullCheckVariables() {
+ return nullCheckVars;
+ }
+
+ /**
+ * @return the top-most join operator after visiting the query plan rooted
+ * at the operator being visited.
+ */
+ public Mutable<ILogicalOperator> getTopJoinReference() {
+ return topJoinRef;
+ }
+
+ @Override
+ public ILogicalOperator visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Void arg)
+ throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg)
+ throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+ hasJoinAncestor = true;
+ boolean needToSwitch = false;
+ for (int i = 0; i < op.getInputs().size(); ++i) {
+ // Deals with single input operators.
+ ILogicalOperator newChild = op.getInputs().get(i).getValue().accept(this, null);
+ op.getInputs().get(i).setValue(newChild);
+ if (i == 1) {
+ needToSwitch = true;
+ }
+ if (rewritten) {
+ break;
+ }
+ }
+
+ // Checks whether there is a need to switch two join branches.
+ if (rewritten && needToSwitch) {
+ Mutable<ILogicalOperator> leftBranch = op.getInputs().get(0);
+ Mutable<ILogicalOperator> rightBranch = op.getInputs().get(1);
+ op.getInputs().set(0, rightBranch);
+ op.getInputs().set(1, leftBranch);
+ }
+ AbstractBinaryJoinOperator returnOp = op;
+ // After rewriting, the original inner join should become an left outer join.
+ if (rewritten) {
+ returnOp = new LeftOuterJoinOperator(op.getCondition());
+ returnOp.getInputs().addAll(op.getInputs());
+ injectNullCheckVars(returnOp);
+ }
+ return returnOp;
+ }
+
+ @Override
+ public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+ hasJoinAncestor = true;
+ // Only rewrites the left child.
+ ILogicalOperator newChild = op.getInputs().get(0).getValue().accept(this, null);
+ op.getInputs().get(0).setValue(newChild);
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
+ throws AlgebricksException {
+ if (op == targetNts) {
+ // Inlines the actual <code>subplanInputOperator</code>.
+ rewritten = true;
+ return subplanInputOperator;
+ } else {
+ return op;
+ }
+ }
+
+ @Override
+ public ILogicalOperator visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+ boolean underJoin = hasJoinAncestor;
+ visitSingleInputOperator(op);
+ if (!rewritten || !underJoin) {
+ return op;
+ }
+
+ // Adjust the ordering if its input operator pipeline has been rewritten.
+ List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprList = new ArrayList<>();
+ // Adds keyVars to the prefix of sorting columns.
+ for (LogicalVariable liveVar : liveVarsFromSubplanInput) {
+ orderExprList.add(new Pair<IOrder, Mutable<ILogicalExpression>>(OrderOperator.ASC_ORDER,
+ new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar))));
+ }
+ orderExprList.addAll(op.getOrderExpressions());
+
+ // Creates an order operator with the new expression list.
+ OrderOperator orderOp = new OrderOperator(orderExprList);
+ orderOp.getInputs().addAll(op.getInputs());
+ context.computeAndSetTypeEnvironmentForOperator(orderOp);
+ return orderOp;
+ }
+
+ @Override
+ public ILogicalOperator visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+ visitSingleInputOperator(op);
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+ boolean underJoin = hasJoinAncestor;
+ visitSingleInputOperator(op);
+ if (!rewritten || !underJoin) {
+ return op;
+ }
+ // Adds all missing variables that should propagates up.
+ for (LogicalVariable keyVar : liveVarsFromSubplanInput) {
+ if (!op.getVariables().contains(keyVar)) {
+ op.getVariables().add(keyVar);
+ }
+ }
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
+ throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+ throw new UnsupportedOperationException("Script operators in a subplan are not supported!");
+ }
+
+ @Override
+ public ILogicalOperator visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+ throw new UnsupportedOperationException(
+ "Nested subplans with a union operator should have been disqualified for this rewriting!");
+ }
+
+ @Override
+ public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+ visitSingleInputOperator(op);
+ if (!rewritten) {
+ return op;
+ }
+ Set<LogicalVariable> liveVars = new HashSet<>();
+ VariableUtilities.getLiveVariables(op, liveVars);
+ if (!liveVars.containsAll(liveVarsFromSubplanInput)) {
+ op.setPropagatesInput(true);
+ }
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+ boolean underJoin = hasJoinAncestor;
+ visitSingleInputOperator(op);
+ if (!rewritten || !underJoin) {
+ return op;
+ }
+ List<LogicalVariable> distinctVarList = op.getDistinctByVarList();
+ for (LogicalVariable keyVar : liveVarsFromSubplanInput) {
+ if (!distinctVarList.contains(keyVar)) {
+ distinctVarList.add(keyVar);
+ }
+ }
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ @Override
+ public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op, Void arg)
+ throws AlgebricksException {
+ visitSingleInputOperator(op);
+ if (!rewritten) {
+ return op;
+ }
+ Set<LogicalVariable> liveVars = new HashSet<>();
+ VariableUtilities.getLiveVariables(op, liveVars);
+ if (!liveVars.containsAll(liveVarsFromSubplanInput)) {
+ op.setPropagateInput(true);
+ }
+ context.computeAndSetTypeEnvironmentForOperator(op);
+ return op;
+ }
+
+ @Override
+ public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+ return visitSingleInputOperator(op);
+ }
+
+ private ILogicalOperator visitSingleInputOperator(ILogicalOperator op) throws AlgebricksException {
+ if (op.getInputs().size() == 1) {
+ // Deals with single input operators.
+ Mutable<ILogicalOperator> childRef = op.getInputs().get(0);
+ ILogicalOperator newChild = childRef.getValue().accept(this, null);
+ if (topJoinRef == null) {
+ LogicalOperatorTag childOpTag = newChild.getOperatorTag();
+ if (childOpTag == LogicalOperatorTag.INNERJOIN || childOpTag == LogicalOperatorTag.LEFTOUTERJOIN) {
+ topJoinRef = childRef;
+ }
+ }
+ op.getInputs().get(0).setValue(newChild);
+ }
+ return op;
+ }
+
+ /**
+ * Inject varaibles to indicate non-matches for the right branch of
+ * a left-outer join.
+ *
+ * @param joinOp
+ * the leftouter join operator.
+ */
+ private void injectNullCheckVars(AbstractBinaryJoinOperator joinOp) {
+ LogicalVariable assignVar = context.newVar();
+ ILogicalOperator assignOp = new AssignOperator(assignVar,
+ new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+ assignOp.getInputs().add(joinOp.getInputs().get(1));
+ joinOp.getInputs().set(1, new MutableObject<ILogicalOperator>(assignOp));
+ nullCheckVars.add(assignVar);
+ }
+
+}