You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2016/01/28 05:46:46 UTC

[7/7] incubator-asterixdb git commit: ASTERIXDB-1005, ASTERIXDB-1263: Clean up subplan flattening: 1. Inline NestedTupleSource and remove SubplanOperator for special cases that join operators inside the SubplanOperator can be re-targeted for correl

ASTERIXDB-1005, ASTERIXDB-1263: Clean up subplan flattening:
1. Inline NestedTupleSource and remove SubplanOperator for special cases that
   join operators inside the SubplanOperator can be re-targeted for
   correlations;
2. Blindly inline NestedTupleSource and remove SubplanOperator for general cases
   where the condition of the special case is not met.

Change-Id: I4dd130a25f3c81272cc23f844ea20e376e990612
Reviewed-on: https://asterix-gerrit.ics.uci.edu/579
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/947fc3cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/947fc3cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/947fc3cb

Branch: refs/heads/master
Commit: 947fc3cb209a6e7e37b435ea35f0d856f3c35dc7
Parents: bf34d18
Author: Yingyi Bu <yi...@couchbase.com>
Authored: Wed Jan 27 20:14:47 2016 -0800
Committer: Yingyi Bu <bu...@gmail.com>
Committed: Wed Jan 27 20:42:09 2016 -0800

----------------------------------------------------------------------
 .../asterix/optimizer/base/RuleCollections.java |   4 -
 .../CancelUnnestWithNestedListifyRule.java      |   8 +
 .../subplan/InlineAllNtsInSubplanVisitor.java   | 705 +++++++++++++++++++
 ...neLeftNtsInSubplanJoinFlatteningVisitor.java | 413 +++++++++++
 ...ineSubplanInputForNestedTupleSourceRule.java | 698 ++++++++----------
 .../rules/subplan/SubplanFlatteningUtil.java    | 203 ++++++
 .../SubplanSpecialFlatteningCheckVisitor.java   | 269 +++++++
 .../rules/util/EquivalenceClassUtils.java       |  56 ++
 asterix-app/data/restaurants/restaurants.adm    |  30 +
 .../optimizerts/queries/nested_loj2.aql         |   2 +-
 .../optimizerts/queries/nested_loj3.aql         |   2 +-
 .../optimizerts/queries/nested_loj4.aql         |  79 +++
 .../queries/query-ASTERIXDB-1005.aql            |  59 ++
 .../queries/query-ASTERIXDB-1263.aql            |  53 ++
 ...join-probe-pidx-with-join-btree-sidx_01.plan |  63 +-
 ...join-probe-pidx-with-join-btree-sidx_02.plan |  63 +-
 .../optimizerts/results/fj-phase1.plan          |  52 +-
 .../results/inverted-index-join/issue741.plan   |  79 +--
 ...dx-with-join-edit-distance-check-idx_01.plan | 139 ++--
 ...obe-pidx-with-join-jaccard-check-idx_01.plan |  87 ++-
 .../optimizerts/results/loj-super-key_01.plan   |  10 +-
 .../optimizerts/results/loj-super-key_02.plan   |   8 +-
 .../optimizerts/results/nest_aggregate.plan     | 109 ++-
 ...join-probe-pidx-with-join-btree-sidx_01.plan |  77 +-
 ...join-probe-pidx-with-join-btree-sidx_02.plan |  77 +-
 ...dx-with-join-edit-distance-check-idx_01.plan | 171 +++--
 ...obe-pidx-with-join-jaccard-check-idx_01.plan | 105 ++-
 ...join-probe-pidx-with-join-rtree-sidx_01.plan |  14 +-
 ...join-probe-pidx-with-join-rtree-sidx_02.plan |  14 +-
 ...in-probe-pidx-with-join-btree-sidx_01_1.plan |  77 +-
 ...in-probe-pidx-with-join-btree-sidx_01_2.plan |  77 +-
 ...in-probe-pidx-with-join-btree-sidx_02_1.plan |  77 +-
 ...in-probe-pidx-with-join-btree-sidx_02_2.plan |  77 +-
 ...dx-with-join-edit-distance-check-idx_01.plan | 171 +++--
 ...join-probe-pidx-with-join-rtree-sidx_01.plan |  14 +-
 ...join-probe-pidx-with-join-rtree-sidx_02.plan |  14 +-
 .../optimizerts/results/nested_loj2.plan        |  26 +-
 .../optimizerts/results/nested_loj3.plan        |  36 +-
 .../optimizerts/results/nested_loj4.plan        |  39 +
 ...in-probe-pidx-with-join-btree-sidx_01_1.plan |  63 +-
 ...in-probe-pidx-with-join-btree-sidx_01_2.plan |  63 +-
 ...in-probe-pidx-with-join-btree-sidx_02_1.plan |  63 +-
 ...in-probe-pidx-with-join-btree-sidx_02_2.plan |  63 +-
 ...dx-with-join-edit-distance-check-idx_01.plan | 139 ++--
 ...join-probe-pidx-with-join-rtree-sidx_01.plan |  67 +-
 ...join-probe-pidx-with-join-rtree-sidx_02.plan |  67 +-
 .../results/push-project-through-group.plan     |  49 +-
 .../optimizerts/results/q08_group_by.plan       |  20 +-
 .../results/query-ASTERIXDB-1005.plan           |  35 +
 .../results/query-ASTERIXDB-1263.plan           |  35 +
 .../optimizerts/results/query-issue562.plan     |  79 ++-
 .../optimizerts/results/query_issue849-2.plan   |   9 +-
 .../optimizerts/results/query_issue849.plan     |   4 +-
 .../results/rtree-index-join/issue730.plan      |  65 +-
 ...join-probe-pidx-with-join-rtree-sidx_01.plan |  67 +-
 ...join-probe-pidx-with-join-rtree-sidx_02.plan |  67 +-
 .../split-materialization-above-join.plan       | 211 +++---
 .../results/udfs/query-ASTERIXDB-1017-2.plan    |  44 +-
 .../udfs/query-ASTERIXDB-1017-recursive-2.plan  |  44 +-
 .../udfs/query-ASTERIXDB-1017-recursive.plan    |  56 +-
 .../results/udfs/query-ASTERIXDB-1017.plan      |  69 +-
 .../results/udfs/query-ASTERIXDB-1018.plan      | 119 ++--
 .../results/udfs/query-ASTERIXDB-1019.plan      | 119 ++--
 .../results/udfs/query-ASTERIXDB-1020.plan      |  83 ++-
 .../results/udfs/query-ASTERIXDB-1029.plan      |  85 +--
 .../results/udfs/query-ASTERIXDB-1029_2.plan    |  85 +--
 .../query-ASTERIXDB-1063.23.query.aql           |  29 +
 .../nest_aggregate2/nest_aggregate2.1.ddl.aql   |  88 +++
 .../nest_aggregate2.2.update.aql                |  50 ++
 .../nest_aggregate2/nest_aggregate2.3.query.aql |  45 ++
 .../query-ASTERIXDB-1005.1.ddl.sqlpp            |  53 ++
 .../query-ASTERIXDB-1005.2.update.sqlpp         |  31 +
 .../query-ASTERIXDB-1005.3.query.sqlpp          |  31 +
 .../query-ASTERIXDB-1063.23.query.sqlpp         |  30 +
 .../nest_aggregate2/nest_aggregate2.1.ddl.sqlpp |  94 +++
 .../nest_aggregate2.2.update.sqlpp              |  40 ++
 .../nest_aggregate2.3.query.sqlpp               |  46 ++
 .../query-ASTERIXDB-1005.1.adm                  |  15 +
 .../overlap_bins_gby_3/overlap_bins_gby_3.1.adm |  84 +--
 .../tinysocial-suite/tinysocial-suite.23.adm    |   1 +
 .../tpch/nest_aggregate2/nest_aggregate2.1.adm  |  11 +
 .../query-ASTERIXDB-1263.23.ast                 |  61 ++
 .../src/test/resources/runtimets/testsuite.xml  |   5 +
 .../resources/runtimets/testsuite_sqlpp.xml     |  10 +
 .../metadata/feeds/FeedMetadataUtil.java        |   2 +-
 85 files changed, 4682 insertions(+), 2171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index 260fb3c..4187059 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -118,8 +118,6 @@ import org.apache.hyracks.algebricks.rewriter.rules.SetExecutionModeRule;
 import org.apache.hyracks.algebricks.rewriter.rules.SimpleUnnestToProductRule;
 import org.apache.hyracks.algebricks.rewriter.rules.subplan.EliminateSubplanRule;
 import org.apache.hyracks.algebricks.rewriter.rules.subplan.EliminateSubplanWithInputCardinalityOneRule;
-import org.apache.hyracks.algebricks.rewriter.rules.subplan.IntroduceGroupByForSubplanRule;
-import org.apache.hyracks.algebricks.rewriter.rules.subplan.IntroduceLeftOuterJoinForSubplanRule;
 import org.apache.hyracks.algebricks.rewriter.rules.subplan.NestedSubplanToJoinRule;
 import org.apache.hyracks.algebricks.rewriter.rules.subplan.PushSubplanIntoGroupByRule;
 import org.apache.hyracks.algebricks.rewriter.rules.subplan.SubplanOutOfGroupRule;
@@ -190,9 +188,7 @@ public final class RuleCollections {
         condPushDownAndJoinInference.add(new IntroJoinInsideSubplanRule());
         condPushDownAndJoinInference.add(new PushMapOperatorDownThroughProductRule());
         condPushDownAndJoinInference.add(new PushSubplanWithAggregateDownThroughProductRule());
-        condPushDownAndJoinInference.add(new IntroduceGroupByForSubplanRule());
         condPushDownAndJoinInference.add(new SubplanOutOfGroupRule());
-        condPushDownAndJoinInference.add(new IntroduceLeftOuterJoinForSubplanRule());
         condPushDownAndJoinInference.add(new AsterixExtractFunctionsFromJoinConditionRule());
 
         condPushDownAndJoinInference.add(new RemoveRedundantVariablesRule());

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
index 558fa88..2de6bc4 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/CancelUnnestWithNestedListifyRule.java
@@ -49,8 +49,11 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RunningAggregatePOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.UnpartitionedPropertyComputer;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
+import com.google.common.collect.ImmutableSet;
+
 /**
  * This rule cancels the unnest with the nested listify. Formally, the following plan<br/>
  *
@@ -195,6 +198,11 @@ public class CancelUnnestWithNestedListifyRule implements IAlgebraicRewriteRule
             return false;
         }
 
+        if (OperatorManipulationUtil.ancestorOfOperators(agg, ImmutableSet.of(LogicalOperatorTag.LIMIT,
+                LogicalOperatorTag.ORDER, LogicalOperatorTag.GROUP, LogicalOperatorTag.DISTINCT))) {
+            return false;
+        }
+
         LogicalVariable aggVar = agg.getVariables().get(0);
         ILogicalExpression aggFun = agg.getExpressions().get(0).getValue();
         if (!aggVar.equals(unnestedVar)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
new file mode 100644
index 0000000..3489982
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -0,0 +1,705 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.subplan;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
+import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.common.utils.Triple;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
+
+/*
+    This visitor inlines all nested tuple source operators in the query
+    plan rooted at the operator being visited, with a deep copy of the query
+    plan rooted at the input <code>subplanInputOperator</code>.
+
+    The visitor ensures that the variables used to correlate between the
+    query plan rooted at <code>subplanInputOperator</code> are propagated
+    to the operator being visited.
+
+    ----------------------------------
+    Here is an abstract example.
+    The original query plan:
+    --Op1
+      --Subplan{
+        --AggregateOp
+          --NestedOp
+            .....
+              --Nested-Tuple-Source
+        }
+        --InputOp
+          .....
+
+    After we call NestedOp.accept(....) with this visitor. We will get an
+    intermediate plan that looks like:
+    --Op1
+      --Subplan{
+        --AggregateOp
+          --NestedOp
+            .....
+              --InputOp'
+                ....
+        }
+        --InputOp
+          .....
+     The plan rooted at InputOp' is a deep copy of the plan rooted at InputOp
+     with a different set of variables.
+
+ */
+class InlineAllNtsInSubplanVisitor implements IQueryOperatorVisitor<ILogicalOperator, Void> {
+    // The optimization context.
+    private final IOptimizationContext context;
+
+    // The target SubplanOperator.
+    private final ILogicalOperator subplanOperator;
+
+    // The input operator to the subplan.
+    private final ILogicalOperator subplanInputOperator;
+
+    // Maps live variables at <code>subplanInputOperator</code> to variables in the flattened nested plan.
+    private final Map<LogicalVariable, LogicalVariable> subplanInputVarToCurrentVarMap = new HashMap<>();
+
+    // Maps variables in the flattened nested plan to live variables at <code>subplannputOperator</code>.
+    private final Map<LogicalVariable, LogicalVariable> currentVarToSubplanInputVarMap = new HashMap<>();
+
+    // The set of key variables at the current operator that is being visited.
+    private final Set<LogicalVariable> correlatedKeyVars = new HashSet<>();
+
+    // The list of variables determining the ordering.
+    private final List<Pair<IOrder, Mutable<ILogicalExpression>>> orderingExprs = new ArrayList<>();
+
+    // Maps variables in the flattened nested plan to live variables at <code>subplannputOperator</code>.
+    private final List<Pair<LogicalVariable, LogicalVariable>> varMapIntroducedByRewriting = new ArrayList<>();
+
+    /**
+     * @param context
+     *            the optimization context
+     * @param subplanInputOperator
+     *            the input operator to the target subplan operator, which is to be inlined.
+     * @throws AlgebricksException
+     */
+    public InlineAllNtsInSubplanVisitor(IOptimizationContext context, ILogicalOperator subplanOperator)
+            throws AlgebricksException {
+        this.context = context;
+        this.subplanOperator = subplanOperator;
+        this.subplanInputOperator = subplanOperator.getInputs().get(0).getValue();
+    }
+
+    public Map<LogicalVariable, LogicalVariable> getInputVariableToOutputVariableMap() {
+        return subplanInputVarToCurrentVarMap;
+    }
+
+    public List<Pair<LogicalVariable, LogicalVariable>> getVariableMapHistory() {
+        return varMapIntroducedByRewriting;
+    }
+
+    public List<Pair<IOrder, Mutable<ILogicalExpression>>> getOrderingExpressions() {
+        return orderingExprs;
+    }
+
+    @Override
+    public ILogicalOperator visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+        return visitAggregateOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Void arg)
+            throws AlgebricksException {
+        return visitAggregateOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg)
+            throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+        visitSingleInputOperator(op);
+        Set<LogicalVariable> groupKeyVars = new HashSet<>();
+        // Maps group by key variables if the corresponding expressions are VariableReferenceExpressions.
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> keyVarExprRef : op.getGroupByList()) {
+            ILogicalExpression expr = keyVarExprRef.second.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
+                LogicalVariable sourceVar = varExpr.getVariableReference();
+                updateInputToOutputVarMapping(sourceVar, keyVarExprRef.first, false);
+                groupKeyVars.add(keyVarExprRef.first);
+            }
+        }
+
+        // Add correlated key variables into group-by keys.
+        Map<LogicalVariable, LogicalVariable> addedGroupKeyMapping = new HashMap<>();
+        for (LogicalVariable keyVar : correlatedKeyVars) {
+            if (!groupKeyVars.contains(keyVar)) {
+                LogicalVariable newVar = context.newVar();
+                op.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
+                        new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyVar))));
+                addedGroupKeyMapping.put(keyVar, newVar);
+            }
+        }
+
+        // Updates decor list.
+        Iterator<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorExprIter = op.getDecorList().iterator();
+        while (decorExprIter.hasNext()) {
+            ILogicalExpression expr = decorExprIter.next().second.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
+                if (correlatedKeyVars.contains(varExpr.getVariableReference())) {
+                    decorExprIter.remove();
+                }
+            }
+        }
+
+        // Updates the var mapping for added group-by keys.
+        for (Map.Entry<LogicalVariable, LogicalVariable> entry : addedGroupKeyMapping.entrySet()) {
+            updateInputToOutputVarMapping(entry.getKey(), entry.getValue(), false);
+        }
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+        // Processes its input operator.
+        visitSingleInputOperator(op);
+        if (correlatedKeyVars.isEmpty()) {
+            return op;
+        }
+
+        // Get live variables before limit.
+        Set<LogicalVariable> inputLiveVars = new HashSet<LogicalVariable>();
+        VariableUtilities.getSubplanLocalLiveVariables(op.getInputs().get(0).getValue(), inputLiveVars);
+
+        // Creates a record construction assign operator.
+        Pair<ILogicalOperator, LogicalVariable> assignOpAndRecordVar = createRecordConstructorAssignOp(inputLiveVars);
+        ILogicalOperator assignOp = assignOpAndRecordVar.first;
+        LogicalVariable recordVar = assignOpAndRecordVar.second;
+        ILogicalOperator inputOp = op.getInputs().get(0).getValue();
+        assignOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
+
+        // Rewrites limit to a group-by with limit as its nested operator.
+        Pair<ILogicalOperator, LogicalVariable> gbyOpAndAggVar = wrapLimitInGroupBy(op, recordVar, inputLiveVars);
+        ILogicalOperator gbyOp = gbyOpAndAggVar.first;
+        LogicalVariable aggVar = gbyOpAndAggVar.second;
+        gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(assignOp));
+
+        // Adds an unnest operators on top of the group-by operator.
+        Pair<ILogicalOperator, LogicalVariable> unnestOpAndUnnestVar = createUnnestForAggregatedList(aggVar);
+        ILogicalOperator unnestOp = unnestOpAndUnnestVar.first;
+        LogicalVariable unnestVar = unnestOpAndUnnestVar.second;
+        unnestOp.getInputs().add(new MutableObject<ILogicalOperator>(gbyOp));
+
+        // Adds field accesses to recover input live variables.
+        ILogicalOperator fieldAccessAssignOp = createFieldAccessAssignOperator(unnestVar, inputLiveVars);
+        fieldAccessAssignOp.getInputs().add(new MutableObject<ILogicalOperator>(unnestOp));
+
+        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(fieldAccessAssignOp, context);
+        return fieldAccessAssignOp;
+    }
+
+    private Pair<ILogicalOperator, LogicalVariable> createRecordConstructorAssignOp(
+            Set<LogicalVariable> inputLiveVars) {
+        // Creates a nested record.
+        List<Mutable<ILogicalExpression>> recordConstructorArgs = new ArrayList<>();
+        for (LogicalVariable inputLiveVar : inputLiveVars) {
+            if (!correlatedKeyVars.contains(inputLiveVar)) {
+                recordConstructorArgs.add(new MutableObject<ILogicalExpression>(new ConstantExpression(
+                        new AsterixConstantValue(new AString(Integer.toString(inputLiveVar.getId()))))));
+                recordConstructorArgs
+                        .add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(inputLiveVar)));
+            }
+        }
+        LogicalVariable recordVar = context.newVar();
+        Mutable<ILogicalExpression> recordExprRef = new MutableObject<ILogicalExpression>(
+                new ScalarFunctionCallExpression(
+                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR),
+                        recordConstructorArgs));
+        AssignOperator assignOp = new AssignOperator(recordVar, recordExprRef);
+        return new Pair<ILogicalOperator, LogicalVariable>(assignOp, recordVar);
+    }
+
+    private Pair<ILogicalOperator, LogicalVariable> wrapLimitInGroupBy(ILogicalOperator op, LogicalVariable recordVar,
+            Set<LogicalVariable> inputLiveVars) throws AlgebricksException {
+        GroupByOperator gbyOp = new GroupByOperator();
+        List<Pair<LogicalVariable, LogicalVariable>> keyVarNewVarPairs = new ArrayList<>();
+        for (LogicalVariable keyVar : correlatedKeyVars) {
+            // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
+            // where the keyVarsToEnforce forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
+            LogicalVariable newVar = context.newVar();
+            gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyVar))));
+            keyVarNewVarPairs.add(new Pair<LogicalVariable, LogicalVariable>(keyVar, newVar));
+        }
+
+        // Creates an aggregate operator doing LISTIFY, as the root of the nested plan of the added group-by operator.
+        List<LogicalVariable> aggVarList = new ArrayList<LogicalVariable>();
+        List<Mutable<ILogicalExpression>> aggExprList = new ArrayList<Mutable<ILogicalExpression>>();
+        LogicalVariable aggVar = context.newVar();
+        List<Mutable<ILogicalExpression>> aggArgList = new ArrayList<>();
+        aggVarList.add(aggVar);
+        // Creates an aggregation function expression.
+        aggArgList.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(recordVar)));
+        ILogicalExpression aggExpr = new AggregateFunctionCallExpression(
+                FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.LISTIFY), false, aggArgList);
+        aggExprList.add(new MutableObject<ILogicalExpression>(aggExpr));
+        AggregateOperator aggOp = new AggregateOperator(aggVarList, aggExprList);
+
+        // Adds the original limit operator as the input operator to the added aggregate operator.
+        aggOp.getInputs().add(new MutableObject<ILogicalOperator>(op));
+        op.getInputs().clear();
+        ILogicalOperator currentOp = op;
+        if (!orderingExprs.isEmpty()) {
+            OrderOperator orderOp = new OrderOperator(cloneOrderingExpression(orderingExprs));
+            op.getInputs().add(new MutableObject<ILogicalOperator>(orderOp));
+            currentOp = orderOp;
+        }
+
+        // Adds a nested tuple source operator as the input operator to the limit operator.
+        NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(gbyOp));
+        currentOp.getInputs().add(new MutableObject<ILogicalOperator>(nts));
+
+        // Sets the root of the added nested plan to the aggregate operator.
+        ILogicalPlan nestedPlan = new ALogicalPlanImpl();
+        nestedPlan.getRoots().add(new MutableObject<ILogicalOperator>(aggOp));
+
+        // Sets the nested plan for the added group-by operator.
+        gbyOp.getNestedPlans().add(nestedPlan);
+
+        // Updates variable mapping for ancestor operators.
+        for (Pair<LogicalVariable, LogicalVariable> keyVarNewVar : keyVarNewVarPairs) {
+            updateInputToOutputVarMapping(keyVarNewVar.first, keyVarNewVar.second, false);
+        }
+        return new Pair<ILogicalOperator, LogicalVariable>(gbyOp, aggVar);
+    }
+
+    private Pair<ILogicalOperator, LogicalVariable> createUnnestForAggregatedList(LogicalVariable aggVar) {
+        LogicalVariable unnestVar = context.newVar();
+        // Creates an unnest function expression.
+        Mutable<ILogicalExpression> unnestArg = new MutableObject<ILogicalExpression>(
+                new VariableReferenceExpression(aggVar));
+        List<Mutable<ILogicalExpression>> unnestArgList = new ArrayList<Mutable<ILogicalExpression>>();
+        unnestArgList.add(unnestArg);
+        Mutable<ILogicalExpression> unnestExpr = new MutableObject<ILogicalExpression>(
+                new UnnestingFunctionCallExpression(
+                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.SCAN_COLLECTION), unnestArgList));
+        ILogicalOperator unnestOp = new UnnestOperator(unnestVar, unnestExpr);
+        return new Pair<ILogicalOperator, LogicalVariable>(unnestOp, unnestVar);
+    }
+
+    private ILogicalOperator createFieldAccessAssignOperator(LogicalVariable recordVar,
+            Set<LogicalVariable> inputLiveVars) {
+        List<LogicalVariable> fieldAccessVars = new ArrayList<>();
+        List<Mutable<ILogicalExpression>> fieldAccessExprs = new ArrayList<>();
+        //Adds field access by name.
+        for (LogicalVariable inputLiveVar : inputLiveVars) {
+            if (!correlatedKeyVars.contains(inputLiveVar)) {
+                // field Var
+                LogicalVariable newVar = context.newVar();
+                fieldAccessVars.add(newVar);
+                // fieldAcess expr
+                List<Mutable<ILogicalExpression>> argRefs = new ArrayList<>();
+                argRefs.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(recordVar)));
+                argRefs.add(new MutableObject<ILogicalExpression>(new ConstantExpression(
+                        new AsterixConstantValue(new AString(Integer.toString(inputLiveVar.getId()))))));
+                fieldAccessExprs.add(new MutableObject<ILogicalExpression>(new ScalarFunctionCallExpression(
+                        FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.FIELD_ACCESS_BY_NAME), argRefs)));
+                // Updates variable mapping for ancestor operators.
+                updateInputToOutputVarMapping(inputLiveVar, newVar, false);
+            }
+        }
+        AssignOperator fieldAccessAssignOp = new AssignOperator(fieldAccessVars, fieldAccessExprs);
+        return fieldAccessAssignOp;
+    }
+
+    @Override
+    public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+        return visitMultiInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+        return visitMultiInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
+            throws AlgebricksException {
+        if (op.getDataSourceReference().getValue() != subplanOperator) {
+            return op;
+        }
+        LogicalOperatorDeepCopyWithNewVariablesVisitor deepCopyVisitor = new LogicalOperatorDeepCopyWithNewVariablesVisitor(
+                context);
+        ILogicalOperator copiedInputOperator = deepCopyVisitor.deepCopy(subplanInputOperator, null);
+
+        // Updates the primary key info in the copied plan segment.
+        Map<LogicalVariable, LogicalVariable> varMap = deepCopyVisitor.getInputToOutputVariableMapping();
+        context.updatePrimaryKeys(varMap);
+
+        correlatedKeyVars.clear();
+        correlatedKeyVars.addAll(EquivalenceClassUtils.findFDHeaderVariables(context, subplanInputOperator));
+        // Update key variables and input-output-var mapping.
+        for (Map.Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
+            LogicalVariable oldVar = entry.getKey();
+            LogicalVariable newVar = entry.getValue();
+            if (correlatedKeyVars.contains(oldVar)) {
+                correlatedKeyVars.remove(oldVar);
+                correlatedKeyVars.add(newVar);
+            }
+            updateInputToOutputVarMapping(oldVar, newVar, true);
+        }
+        return copiedInputOperator;
+    }
+
+    @Override
+    public ILogicalOperator visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+        visitSingleInputOperator(op);
+        if (correlatedKeyVars.isEmpty()) {
+            return op;
+        }
+
+        orderingExprs.clear();
+        orderingExprs.addAll(cloneOrderingExpression(op.getOrderExpressions()));
+
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprList = new ArrayList<>();
+        // Adds keyVars to the prefix of sorting columns.
+        for (LogicalVariable keyVar : correlatedKeyVars) {
+            orderExprList.add(new Pair<IOrder, Mutable<ILogicalExpression>>(OrderOperator.ASC_ORDER,
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyVar))));
+        }
+        orderExprList.addAll(op.getOrderExpressions());
+
+        // Creates an order operator with the new expression list.
+        OrderOperator orderOp = new OrderOperator(orderExprList);
+        orderOp.getInputs().addAll(op.getInputs());
+        context.computeAndSetTypeEnvironmentForOperator(orderOp);
+        return orderOp;
+    }
+
+    @Override
+    public ILogicalOperator visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+        visitSingleInputOperator(op);
+        List<Mutable<ILogicalExpression>> assignedExprRefs = op.getExpressions();
+        List<LogicalVariable> assignedVars = op.getVariables();
+        // Maps assigning variables if assignment expressions are VariableReferenceExpressions.
+        for (int index = 0; index < assignedVars.size(); ++index) {
+            ILogicalExpression expr = assignedExprRefs.get(index).getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
+                LogicalVariable sourceVar = varExpr.getVariableReference();
+                updateInputToOutputVarMapping(sourceVar, assignedVars.get(index), false);
+            }
+        }
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+        visitSingleInputOperator(op);
+        // Adds all missing variables that should propagates up.
+        for (LogicalVariable keyVar : correlatedKeyVars) {
+            if (!op.getVariables().contains(keyVar)) {
+                op.getVariables().add(keyVar);
+            }
+        }
+        return op;
+
+    }
+
+    @Override
+    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
+            throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+        throw new UnsupportedOperationException("Script operators in a subplan are not supported!");
+    }
+
+    @Override
+    public ILogicalOperator visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+        visitMultiInputOperator(op);
+        // Update the variable mappings
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varTriples = op.getVariableMappings();
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varTriples) {
+            updateInputToOutputVarMapping(triple.third, triple.first, false);
+            updateInputToOutputVarMapping(triple.second, triple.first, false);
+        }
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+        visitSingleInputOperator(op);
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        if (!liveVars.containsAll(correlatedKeyVars)) {
+            op.setPropagatesInput(true);
+        }
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+        visitSingleInputOperator(op);
+        List<LogicalVariable> distinctVarList = op.getDistinctByVarList();
+        for (LogicalVariable keyVar : correlatedKeyVars) {
+            if (!distinctVarList.contains(keyVar)) {
+                distinctVarList.add(keyVar);
+            }
+        }
+        context.computeAndSetTypeEnvironmentForOperator(op);
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op, Void arg)
+            throws AlgebricksException {
+        visitSingleInputOperator(op);
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        if (!liveVars.containsAll(correlatedKeyVars)) {
+            op.setPropagateInput(true);
+        }
+        context.computeAndSetTypeEnvironmentForOperator(op);
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    /**
+     * Wraps an AggregateOperator or RunningAggregateOperator with a group-by operator where
+     * the group-by keys are variables in keyVarsToEnforce.
+     * Note that the function here prevents this visitor being used to rewrite arbitrary query plans.
+     * Instead, it could only be used for rewriting a nested plan within a subplan operator.
+     *
+     * @param op
+     *            the logical operator for aggregate or running aggregate.
+     * @param keyVarsToEnforce
+     *            the set of variables that needs to preserve.
+     * @return the wrapped group-by operator if {@code keyVarsToEnforce} is not empty, and {@code op} otherwise.
+     * @throws AlgebricksException
+     */
+    private ILogicalOperator visitAggregateOperator(ILogicalOperator op) throws AlgebricksException {
+        visitSingleInputOperator(op);
+        if (correlatedKeyVars.isEmpty()) {
+            return op;
+        }
+        GroupByOperator gbyOp = new GroupByOperator();
+        for (LogicalVariable keyVar : correlatedKeyVars) {
+            // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
+            // where the keyVarsToEnforce forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
+            LogicalVariable newVar = context.newVar();
+            gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyVar))));
+            updateInputToOutputVarMapping(keyVar, newVar, false);
+        }
+
+        ILogicalOperator inputOp = op.getInputs().get(0).getValue();
+        gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
+
+        NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(gbyOp));
+        op.getInputs().clear();
+        op.getInputs().add(new MutableObject<ILogicalOperator>(nts));
+
+        ILogicalPlan nestedPlan = new ALogicalPlanImpl();
+        nestedPlan.getRoots().add(new MutableObject<ILogicalOperator>(op));
+        gbyOp.getNestedPlans().add(nestedPlan);
+
+        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(gbyOp, context);
+        return op;
+    }
+
+    private ILogicalOperator visitMultiInputOperator(ILogicalOperator op) throws AlgebricksException {
+        orderingExprs.clear();
+        Set<LogicalVariable> keyVarsForCurrentBranch = new HashSet<LogicalVariable>();
+        for (int i = op.getInputs().size() - 1; i >= 0; --i) {
+            // Stores key variables for the previous branch.
+            keyVarsForCurrentBranch.addAll(correlatedKeyVars);
+            correlatedKeyVars.clear();
+
+            // Deals with single input operators.
+            ILogicalOperator newChild = op.getInputs().get(i).getValue().accept(this, null);
+            op.getInputs().get(i).setValue(newChild);
+
+            if (correlatedKeyVars.isEmpty()) {
+                correlatedKeyVars.addAll(keyVarsForCurrentBranch);
+            }
+            keyVarsForCurrentBranch.clear();
+        }
+        subtituteVariables(op);
+        return op;
+    }
+
+    private ILogicalOperator visitSingleInputOperator(ILogicalOperator op) throws AlgebricksException {
+        if (op.getInputs().size() == 1) {
+            // Deals with single input operators.
+            ILogicalOperator newChild = op.getInputs().get(0).getValue().accept(this, null);
+            op.getInputs().get(0).setValue(newChild);
+        }
+        subtituteVariables(op);
+        return op;
+    }
+
+    private void subtituteVariables(ILogicalOperator op) throws AlgebricksException {
+        VariableUtilities.substituteVariables(op, subplanInputVarToCurrentVarMap, context);
+        for (Pair<LogicalVariable, LogicalVariable> pair : varMapIntroducedByRewriting) {
+            VariableUtilities.substituteVariables(op, pair.first, pair.second, context);
+        }
+    }
+
+    private void updateInputToOutputVarMapping(LogicalVariable oldVar, LogicalVariable newVar, boolean inNts) {
+        if (correlatedKeyVars.contains(oldVar)) {
+            correlatedKeyVars.remove(oldVar);
+            correlatedKeyVars.add(newVar);
+        }
+
+        for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : orderingExprs) {
+            orderExpr.second.getValue().substituteVar(oldVar, newVar);
+        }
+
+        if (currentVarToSubplanInputVarMap.containsKey(oldVar)) {
+            // Find the original mapped var.
+            oldVar = currentVarToSubplanInputVarMap.get(oldVar);
+        }
+        if (subplanInputVarToCurrentVarMap.containsKey(oldVar) || inNts) {
+            subplanInputVarToCurrentVarMap.put(oldVar, newVar);
+            currentVarToSubplanInputVarMap.put(newVar, oldVar);
+        } else {
+            varMapIntroducedByRewriting.add(new Pair<LogicalVariable, LogicalVariable>(oldVar, newVar));
+        }
+    }
+
+    private List<Pair<IOrder, Mutable<ILogicalExpression>>> cloneOrderingExpression(
+            List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs) {
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> clonedOrderExprs = new ArrayList<>();
+        for (Pair<IOrder, Mutable<ILogicalExpression>> orderExpr : orderExprs) {
+            clonedOrderExprs.add(new Pair<IOrder, Mutable<ILogicalExpression>>(orderExpr.first,
+                    new MutableObject<ILogicalExpression>(orderExpr.second.getValue().cloneExpression())));
+        }
+        return clonedOrderExprs;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/947fc3cb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
new file mode 100644
index 0000000..191a2f9
--- /dev/null
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.subplan;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
+
+/*
+    This visitor inlines the input <code>nts</code> in the query plan rooted
+    at the operator being visited, with the query plan rooted at the input
+    <code>subplanInputOperator</code>.
+
+    The visitor ensures that:
+    1. live variables at <code>subplanInputOperator</code> are
+    propagated to the top-most join operator in the query plan rooted
+    at the operator being visited;
+    2. no available tuple at <code>subplanInputOperator</code> get lost along the
+    pipeline to the top-most join operator in the query plan rooted
+    at the operator being visited.
+*/
+class InlineLeftNtsInSubplanJoinFlatteningVisitor implements IQueryOperatorVisitor<ILogicalOperator, Void> {
+    // The optimization context.
+    private final IOptimizationContext context;
+
+    // The input operator to the subplan.
+    private final ILogicalOperator subplanInputOperator;
+
+    // The target Nts operator.
+    private final ILogicalOperator targetNts;
+
+    // The live variables in <code>subplanInputOperator</code> to enforce.
+    private final Set<LogicalVariable> liveVarsFromSubplanInput = new HashSet<>();
+
+    // The state indicate if the operator tree rooted at the current operator is rewritten.
+    private boolean rewritten = false;
+
+    // The state indicate if the operator tree rooted at the current operator contains a rewritten join.
+    private boolean hasJoinAncestor = false;
+
+    // A set of variables that are needed for not-null checks in the final group-by operator.
+    private Set<LogicalVariable> nullCheckVars = new HashSet<LogicalVariable>();
+
+    // The top join reference.
+    private Mutable<ILogicalOperator> topJoinRef;
+
+    /***
+     * @param context
+     *            the optimization context
+     * @param subplanInputOperator
+     *            the input operator to the target SubplanOperator
+     * @param nts
+     *            the NestedTupleSourceOperator to be replaced by <code>subplanInputOperator</code>
+     * @throws AlgebricksException
+     */
+    public InlineLeftNtsInSubplanJoinFlatteningVisitor(IOptimizationContext context,
+            ILogicalOperator subplanInputOperator, ILogicalOperator nts) throws AlgebricksException {
+        this.context = context;
+        this.subplanInputOperator = subplanInputOperator;
+        this.targetNts = nts;
+        VariableUtilities.getSubplanLocalLiveVariables(subplanInputOperator, liveVarsFromSubplanInput);
+    }
+
+    /**
+     * @return a set of variables indicating whether a tuple from the right
+     *         branch of a left-outer join is a non-match.
+     */
+    public Set<LogicalVariable> getNullCheckVariables() {
+        return nullCheckVars;
+    }
+
+    /**
+     * @return the top-most join operator after visiting the query plan rooted
+     *         at the operator being visited.
+     */
+    public Mutable<ILogicalOperator> getTopJoinReference() {
+        return topJoinRef;
+    }
+
+    @Override
+    public ILogicalOperator visitAggregateOperator(AggregateOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op, Void arg)
+            throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg)
+            throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitLimitOperator(LimitOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws AlgebricksException {
+        hasJoinAncestor = true;
+        boolean needToSwitch = false;
+        for (int i = 0; i < op.getInputs().size(); ++i) {
+            // Deals with single input operators.
+            ILogicalOperator newChild = op.getInputs().get(i).getValue().accept(this, null);
+            op.getInputs().get(i).setValue(newChild);
+            if (i == 1) {
+                needToSwitch = true;
+            }
+            if (rewritten) {
+                break;
+            }
+        }
+
+        // Checks whether there is a need to switch two join branches.
+        if (rewritten && needToSwitch) {
+            Mutable<ILogicalOperator> leftBranch = op.getInputs().get(0);
+            Mutable<ILogicalOperator> rightBranch = op.getInputs().get(1);
+            op.getInputs().set(0, rightBranch);
+            op.getInputs().set(1, leftBranch);
+        }
+        AbstractBinaryJoinOperator returnOp = op;
+        // After rewriting, the original inner join should become an left outer join.
+        if (rewritten) {
+            returnOp = new LeftOuterJoinOperator(op.getCondition());
+            returnOp.getInputs().addAll(op.getInputs());
+            injectNullCheckVars(returnOp);
+        }
+        return returnOp;
+    }
+
+    @Override
+    public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) throws AlgebricksException {
+        hasJoinAncestor = true;
+        // Only rewrites the left child.
+        ILogicalOperator newChild = op.getInputs().get(0).getValue().accept(this, null);
+        op.getInputs().get(0).setValue(newChild);
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
+            throws AlgebricksException {
+        if (op == targetNts) {
+            // Inlines the actual <code>subplanInputOperator</code>.
+            rewritten = true;
+            return subplanInputOperator;
+        } else {
+            return op;
+        }
+    }
+
+    @Override
+    public ILogicalOperator visitOrderOperator(OrderOperator op, Void arg) throws AlgebricksException {
+        boolean underJoin = hasJoinAncestor;
+        visitSingleInputOperator(op);
+        if (!rewritten || !underJoin) {
+            return op;
+        }
+
+        // Adjust the ordering if its input operator pipeline has been rewritten.
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprList = new ArrayList<>();
+        // Adds keyVars to the prefix of sorting columns.
+        for (LogicalVariable liveVar : liveVarsFromSubplanInput) {
+            orderExprList.add(new Pair<IOrder, Mutable<ILogicalExpression>>(OrderOperator.ASC_ORDER,
+                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar))));
+        }
+        orderExprList.addAll(op.getOrderExpressions());
+
+        // Creates an order operator with the new expression list.
+        OrderOperator orderOp = new OrderOperator(orderExprList);
+        orderOp.getInputs().addAll(op.getInputs());
+        context.computeAndSetTypeEnvironmentForOperator(orderOp);
+        return orderOp;
+    }
+
+    @Override
+    public ILogicalOperator visitAssignOperator(AssignOperator op, Void arg) throws AlgebricksException {
+        visitSingleInputOperator(op);
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitSelectOperator(SelectOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
+        boolean underJoin = hasJoinAncestor;
+        visitSingleInputOperator(op);
+        if (!rewritten || !underJoin) {
+            return op;
+        }
+        // Adds all missing variables that should propagates up.
+        for (LogicalVariable keyVar : liveVarsFromSubplanInput) {
+            if (!op.getVariables().contains(keyVar)) {
+                op.getVariables().add(keyVar);
+            }
+        }
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
+            throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitScriptOperator(ScriptOperator op, Void arg) throws AlgebricksException {
+        throw new UnsupportedOperationException("Script operators in a subplan are not supported!");
+    }
+
+    @Override
+    public ILogicalOperator visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitUnionOperator(UnionAllOperator op, Void arg) throws AlgebricksException {
+        throw new UnsupportedOperationException(
+                "Nested subplans with a union operator should have been disqualified for this rewriting!");
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
+        visitSingleInputOperator(op);
+        if (!rewritten) {
+            return op;
+        }
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        if (!liveVars.containsAll(liveVarsFromSubplanInput)) {
+            op.setPropagatesInput(true);
+        }
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitDistinctOperator(DistinctOperator op, Void arg) throws AlgebricksException {
+        boolean underJoin = hasJoinAncestor;
+        visitSingleInputOperator(op);
+        if (!rewritten || !underJoin) {
+            return op;
+        }
+        List<LogicalVariable> distinctVarList = op.getDistinctByVarList();
+        for (LogicalVariable keyVar : liveVarsFromSubplanInput) {
+            if (!distinctVarList.contains(keyVar)) {
+                distinctVarList.add(keyVar);
+            }
+        }
+        context.computeAndSetTypeEnvironmentForOperator(op);
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    @Override
+    public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op, Void arg)
+            throws AlgebricksException {
+        visitSingleInputOperator(op);
+        if (!rewritten) {
+            return op;
+        }
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(op, liveVars);
+        if (!liveVars.containsAll(liveVarsFromSubplanInput)) {
+            op.setPropagateInput(true);
+        }
+        context.computeAndSetTypeEnvironmentForOperator(op);
+        return op;
+    }
+
+    @Override
+    public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
+    private ILogicalOperator visitSingleInputOperator(ILogicalOperator op) throws AlgebricksException {
+        if (op.getInputs().size() == 1) {
+            // Deals with single input operators.
+            Mutable<ILogicalOperator> childRef = op.getInputs().get(0);
+            ILogicalOperator newChild = childRef.getValue().accept(this, null);
+            if (topJoinRef == null) {
+                LogicalOperatorTag childOpTag = newChild.getOperatorTag();
+                if (childOpTag == LogicalOperatorTag.INNERJOIN || childOpTag == LogicalOperatorTag.LEFTOUTERJOIN) {
+                    topJoinRef = childRef;
+                }
+            }
+            op.getInputs().get(0).setValue(newChild);
+        }
+        return op;
+    }
+
+    /**
+     * Inject varaibles to indicate non-matches for the right branch of
+     * a left-outer join.
+     *
+     * @param joinOp
+     *            the leftouter join operator.
+     */
+    private void injectNullCheckVars(AbstractBinaryJoinOperator joinOp) {
+        LogicalVariable assignVar = context.newVar();
+        ILogicalOperator assignOp = new AssignOperator(assignVar,
+                new MutableObject<ILogicalExpression>(ConstantExpression.TRUE));
+        assignOp.getInputs().add(joinOp.getInputs().get(1));
+        joinOp.getInputs().set(1, new MutableObject<ILogicalOperator>(assignOp));
+        nullCheckVars.add(assignVar);
+    }
+
+}