You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2016/03/10 23:14:09 UTC

[2/2] incubator-asterixdb git commit: ASTERIXDB-1168: use either primary key or generated id for decorrelation.

ASTERIXDB-1168: use either primary key or generated id for decorrelation.

Change-Id: Ib5e232f3ba99018cd1aedfa4f8bb2f98affa0f55
Reviewed-on: https://asterix-gerrit.ics.uci.edu/696
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Jianfeng Jia <ji...@gmail.com>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/ae555bab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/ae555bab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/ae555bab

Branch: refs/heads/master
Commit: ae555bab68948ce15057827633bb4c3ea10d93a9
Parents: 4b96348
Author: Yingyi Bu <yi...@couchbase.com>
Authored: Wed Mar 9 21:29:32 2016 -0800
Committer: Yingyi Bu <bu...@gmail.com>
Committed: Thu Mar 10 14:09:04 2016 -0800

----------------------------------------------------------------------
 ...quivalenceClassForRecordConstructorRule.java |   5 +-
 .../optimizer/rules/UnnestToDataScanRule.java   |  24 +--
 .../rules/am/IntroduceJoinAccessMethodRule.java |   3 +-
 .../am/IntroduceSelectAccessMethodRule.java     |   5 +-
 .../rules/am/OptimizableOperatorSubTree.java    |  16 +-
 .../subplan/InlineAllNtsInSubplanVisitor.java   |  32 ++-
 ...ineSubplanInputForNestedTupleSourceRule.java |  47 +++--
 .../rules/util/EquivalenceClassUtils.java       |  84 ++++----
 .../queries/query-ASTERIXDB-1168.aql            |  33 ++++
 .../queries/udfs/query-ASTERIXDB-1020.aql       |   2 +-
 ...join-probe-sidx-with-join-btree-sidx_01.plan |  64 +++---
 .../results/query-ASTERIXDB-1168.plan           |  31 +++
 .../optimizerts/results/query-issue562.plan     |  87 ++++-----
 .../results/udfs/query-ASTERIXDB-1018.plan      |  76 ++++----
 .../results/udfs/query-ASTERIXDB-1019.plan      |  76 ++++----
 .../results/udfs/query-ASTERIXDB-1020.plan      |  91 ++++-----
 .../decorrelate_with_unique_id.1.ddl.sqlpp      |  55 ++++++
 .../decorrelate_with_unique_id.2.update.sqlpp   |  18 ++
 .../decorrelate_with_unique_id.3.query.sqlpp    |  32 +++
 .../decorrelate_with_unique_id_2.1.ddl.sqlpp    |  55 ++++++
 .../decorrelate_with_unique_id_2.2.update.sqlpp |  18 ++
 .../decorrelate_with_unique_id_2.3.query.sqlpp  |  39 ++++
 .../gby_inline/gby_inline.3.query.sqlpp         |   3 +-
 .../query-ASTERIXDB-1168.1.ddl.sqlpp            |  27 +++
 .../query-ASTERIXDB-1168.2.update.sqlpp         |  29 +++
 .../query-ASTERIXDB-1168.3.query.sqlpp          |  27 +++
 .../decorrelate_with_unique_id.1.adm            | 150 ++++++++++++++
 .../decorrelate_with_unique_id_2.1.adm          |   9 +
 .../subquery/gby_inline/gby_inline.1.adm        | 194 +++++++++----------
 .../query-ASTERIXDB-1168.1.adm                  |   5 +
 .../resources/runtimets/testsuite_sqlpp.xml     |  15 ++
 .../metadata/declared/AqlDataSource.java        |   8 +
 32 files changed, 979 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AddEquivalenceClassForRecordConstructorRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AddEquivalenceClassForRecordConstructorRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AddEquivalenceClassForRecordConstructorRule.java
index 218146f..b2271a1 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AddEquivalenceClassForRecordConstructorRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/AddEquivalenceClassForRecordConstructorRule.java
@@ -37,6 +37,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
@@ -85,7 +86,7 @@ public class AddEquivalenceClassForRecordConstructorRule implements IAlgebraicRe
         for (int exprIndex = 0; exprIndex < exprRefs.size(); ++exprIndex) {
             ILogicalExpression expr = exprRefs.get(exprIndex).getValue();
             if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
-                ScalarFunctionCallExpression funcExpr = (ScalarFunctionCallExpression) expr;
+                AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) expr;
                 FunctionIdentifier fid = funcExpr.getFunctionIdentifier();
                 if (fid == AsterixBuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR
                         || fid == AsterixBuiltinFunctions.OPEN_RECORD_CONSTRUCTOR) {
@@ -99,7 +100,7 @@ public class AddEquivalenceClassForRecordConstructorRule implements IAlgebraicRe
 
     @SuppressWarnings("unchecked")
     private boolean propagateEquivalenceClassesForRecordConstructor(LogicalVariable recordVar,
-            ScalarFunctionCallExpression funcExpr, AssignOperator assignOp, IOptimizationContext context) {
+            AbstractFunctionCallExpression funcExpr, AssignOperator assignOp, IOptimizationContext context) {
         List<Mutable<ILogicalExpression>> argRefs = funcExpr.getArguments();
         boolean changed = false;
         // Only odd position arguments are field value expressions.

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 5af6ccc..1f965d6 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -131,7 +131,7 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
                 List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
                 scanInpList.addAll(unnest.getInputs());
                 opRef.setValue(scan);
-                addPrimaryKey(variables, context);
+                addPrimaryKey(variables, dataSource, context);
                 context.computeAndSetTypeEnvironmentForOperator(scan);
 
                 // Adds equivalence classes --- one equivalent class between a primary key
@@ -173,15 +173,15 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
                 v.add(unnest.getVariable());
 
                 String csLocations = metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
-                DataSourceScanOperator scan = new DataSourceScanOperator(v,
-                        createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation,
-                                metadataProvider, policy, outputType,
-                                null /* TODO(Abdullah): to figure out the meta type name*/, csLocations));
+                AqlDataSource dataSource = createFeedDataSource(asid, targetDataset, sourceFeedName,
+                        subscriptionLocation, metadataProvider, policy, outputType,
+                        null /* TODO(Abdullah): to figure out the meta type name*/, csLocations);
+                DataSourceScanOperator scan = new DataSourceScanOperator(v, dataSource);
 
                 List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
                 scanInpList.addAll(unnest.getInputs());
                 opRef.setValue(scan);
-                addPrimaryKey(v, context);
+                addPrimaryKey(v, dataSource, context);
                 context.computeAndSetTypeEnvironmentForOperator(scan);
 
                 return true;
@@ -192,12 +192,12 @@ public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
         return false;
     }
 
-    public void addPrimaryKey(List<LogicalVariable> scanVariables, IOptimizationContext context) {
-        int n = scanVariables.size();
-        List<LogicalVariable> head = new ArrayList<LogicalVariable>(scanVariables.subList(0, n - 1));
-        List<LogicalVariable> tail = new ArrayList<LogicalVariable>(1);
-        tail.add(scanVariables.get(n - 1));
-        FunctionalDependency pk = new FunctionalDependency(head, tail);
+    private void addPrimaryKey(List<LogicalVariable> scanVariables, AqlDataSource dataSource,
+            IOptimizationContext context) {
+        List<LogicalVariable> primaryKey = dataSource.getPrimaryKeyVariables(scanVariables);
+        List<LogicalVariable> tail = new ArrayList<LogicalVariable>();
+        tail.addAll(scanVariables);
+        FunctionalDependency pk = new FunctionalDependency(primaryKey, tail);
         context.addPrimaryKey(pk);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
index fce84d1..8a9bb3f 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceJoinAccessMethodRule.java
@@ -218,7 +218,8 @@ public class IntroduceJoinAccessMethodRule extends AbstractIntroduceAccessMethod
         }
     }
 
-    protected boolean matchesOperatorPattern(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+    protected boolean matchesOperatorPattern(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         // First check that the operator is a join and its condition is a function call.
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
         if (context.checkIfInDontApplySet(this, op1)) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
index bca7e04..5b9a9b6 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroduceSelectAccessMethodRule.java
@@ -137,7 +137,7 @@ public class IntroduceSelectAccessMethodRule extends AbstractIntroduceAccessMeth
 
     private boolean intersectAllSecondaryIndexes(List<Pair<IAccessMethod, Index>> chosenIndexes,
             Map<IAccessMethod, AccessMethodAnalysisContext> analyzedAMs, IOptimizationContext context)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         Pair<IAccessMethod, Index> chosenIndex = null;
         Optional<Pair<IAccessMethod, Index>> primaryIndex = chosenIndexes.stream()
                 .filter(pair -> pair.second.isPrimaryIndex()).findFirst();
@@ -205,7 +205,8 @@ public class IntroduceSelectAccessMethodRule extends AbstractIntroduceAccessMeth
         return lop;
     }
 
-    protected boolean matchesOperatorPattern(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+    protected boolean matchesOperatorPattern(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
         // First check that the operator is a select and its condition is a function call.
         AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
         if (context.checkIfInDontApplySet(this, op1)) {

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
index 4865710..8313504 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/OptimizableOperatorSubTree.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractScan
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 
 /**
  * Operator subtree that matches the following patterns, and provides convenient access to its nodes:
@@ -77,7 +78,7 @@ public class OptimizableOperatorSubTree {
     public List<Dataset> ixJoinOuterAdditionalDatasets = null;
     public List<ARecordType> ixJoinOuterAdditionalRecordTypes = null;
 
-    public boolean initFromSubTree(Mutable<ILogicalOperator> subTreeOpRef) {
+    public boolean initFromSubTree(Mutable<ILogicalOperator> subTreeOpRef) throws AlgebricksException {
         reset();
         rootRef = subTreeOpRef;
         root = subTreeOpRef.getValue();
@@ -96,11 +97,14 @@ public class OptimizableOperatorSubTree {
                 return initializeDataSource(subTreeOpRef);
             }
             // Match (assign | unnest)+.
-            while (subTreeOp.getOperatorTag() == LogicalOperatorTag.ASSIGN
-                    || subTreeOp.getOperatorTag() == LogicalOperatorTag.UNNEST) {
-                assignsAndUnnestsRefs.add(subTreeOpRef);
-                assignsAndUnnests.add(subTreeOp);
-
+            while ((subTreeOp.getOperatorTag() == LogicalOperatorTag.ASSIGN
+                    || subTreeOp.getOperatorTag() == LogicalOperatorTag.UNNEST)) {
+                if (OperatorPropertiesUtil.isStatefulAssign(subTreeOp)) {
+                    return false;
+                } else {
+                    assignsAndUnnestsRefs.add(subTreeOpRef);
+                    assignsAndUnnests.add(subTreeOp);
+                }
                 subTreeOpRef = subTreeOp.getInputs().get(0);
                 subTreeOp = (AbstractLogicalOperator) subTreeOpRef.getValue();
             };

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 10ef1f6..09dd2b2 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -19,11 +19,13 @@
 package org.apache.asterix.optimizer.rules.subplan;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.asterix.lang.common.util.FunctionUtil;
@@ -79,6 +81,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
+import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
 
@@ -418,10 +421,11 @@ class InlineAllNtsInSubplanVisitor implements IQueryOperatorVisitor<ILogicalOper
 
         // Updates the primary key info in the copied plan segment.
         Map<LogicalVariable, LogicalVariable> varMap = deepCopyVisitor.getInputToOutputVariableMapping();
-        context.updatePrimaryKeys(varMap);
-
+        addPrimaryKeys(varMap);
+        Pair<ILogicalOperator, Set<LogicalVariable>> primaryOpAndVars = EquivalenceClassUtils
+                .findOrCreatePrimaryKeyOpAndVariables(copiedInputOperator, true, context);
         correlatedKeyVars.clear();
-        correlatedKeyVars.addAll(EquivalenceClassUtils.findFDHeaderVariables(context, subplanInputOperator));
+        correlatedKeyVars.addAll(primaryOpAndVars.second);
         // Update key variables and input-output-var mapping.
         for (Map.Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
             LogicalVariable oldVar = entry.getKey();
@@ -432,7 +436,7 @@ class InlineAllNtsInSubplanVisitor implements IQueryOperatorVisitor<ILogicalOper
             }
             updateInputToOutputVarMapping(oldVar, newVar, true);
         }
-        return copiedInputOperator;
+        return primaryOpAndVars.first;
     }
 
     @Override
@@ -726,4 +730,24 @@ class InlineAllNtsInSubplanVisitor implements IQueryOperatorVisitor<ILogicalOper
         return clonedOrderExprs;
     }
 
+    private void addPrimaryKeys(Map<LogicalVariable, LogicalVariable> varMap) {
+        for (Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
+            List<LogicalVariable> dependencyVars = context.findPrimaryKey(entry.getKey());
+            if (dependencyVars == null) {
+                // No key dependencies
+                continue;
+            }
+            List<LogicalVariable> newDependencies = new ArrayList<>();
+            for (LogicalVariable dependencyVar : dependencyVars) {
+                LogicalVariable newDependencyVar = varMap.get(dependencyVar);
+                if (newDependencyVar == null) {
+                    continue;
+                }
+                newDependencies.add(newDependencyVar);
+            }
+            context.addPrimaryKey(
+                    new FunctionalDependency(newDependencies, Collections.singletonList(entry.getValue())));
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
index 9f03ffa..c018e8e 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineSubplanInputForNestedTupleSourceRule.java
@@ -337,20 +337,25 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
             return traverseNonSubplanOperator(subplanOp, context);
         }
         Mutable<ILogicalOperator> inputOpRef = subplanOp.getInputs().get(0);
-        ILogicalOperator inputOp = inputOpRef.getValue();
+        ILogicalOperator inputOpBackup = inputOpRef.getValue();
+        // Creates parameters for the left outer join operator.
+        Pair<ILogicalOperator, Set<LogicalVariable>> primaryOpAndVars = EquivalenceClassUtils
+                .findOrCreatePrimaryKeyOpAndVariables(inputOpBackup, true, context);
+        ILogicalOperator inputOp = primaryOpAndVars.first;
+        Set<LogicalVariable> primaryKeyVars = primaryOpAndVars.second;
+        inputOpRef.setValue(inputOp);
+        Set<LogicalVariable> inputLiveVars = new HashSet<LogicalVariable>();
+        VariableUtilities.getLiveVariables(inputOp, inputLiveVars);
+
         Pair<Map<LogicalVariable, LogicalVariable>, List<Pair<IOrder, Mutable<ILogicalExpression>>>> varMapAndOrderExprs = SubplanFlatteningUtil
                 .inlineAllNestedTupleSource(subplanOp, context);
         Map<LogicalVariable, LogicalVariable> varMap = varMapAndOrderExprs.first;
         if (varMap == null) {
+            inputOpRef.setValue(inputOpBackup);
             // Traverses the operator as if it is not a subplan.
             return traverseNonSubplanOperator(subplanOp, context);
         }
 
-        // Creates parameters for the left outer join operator.
-        Set<LogicalVariable> inputLiveVars = new HashSet<LogicalVariable>();
-        VariableUtilities.getLiveVariables(inputOp, inputLiveVars);
-        Set<LogicalVariable> fdCoveringVars = EquivalenceClassUtils.findFDHeaderVariables(context, inputOp);
-
         Mutable<ILogicalOperator> rightInputOpRef = subplanOp.getNestedPlans().get(0).getRoots().get(0).getValue()
                 .getInputs().get(0);
         ILogicalOperator rightInputOp = rightInputOpRef.getValue();
@@ -365,7 +370,7 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
 
         // Constructs the join predicate for the leftOuter join.
         List<Mutable<ILogicalExpression>> joinPredicates = new ArrayList<Mutable<ILogicalExpression>>();
-        for (LogicalVariable liveVar : fdCoveringVars) {
+        for (LogicalVariable liveVar : primaryKeyVars) {
             List<Mutable<ILogicalExpression>> arguments = new ArrayList<Mutable<ILogicalExpression>>();
             arguments.add(new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar)));
             LogicalVariable rightVar = varMap.get(liveVar);
@@ -378,7 +383,7 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
         ILogicalExpression joinExpr = joinPredicates.size() > 1
                 ? new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(AlgebricksBuiltinFunctions.AND),
                         joinPredicates)
-                : joinPredicates.get(0).getValue();
+                : joinPredicates.size() > 0 ? joinPredicates.get(0).getValue() : ConstantExpression.TRUE;
         LeftOuterJoinOperator leftOuterJoinOp = new LeftOuterJoinOperator(
                 new MutableObject<ILogicalExpression>(joinExpr), inputOpRef, rightInputOpRef);
         OperatorManipulationUtil.computeTypeEnvironmentBottomUp(rightInputOp, context);
@@ -391,7 +396,7 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
         GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, nestedPlans);
 
         Map<LogicalVariable, LogicalVariable> replacedVarMap = new HashMap<>();
-        for (LogicalVariable liveVar : fdCoveringVars) {
+        for (LogicalVariable liveVar : primaryKeyVars) {
             LogicalVariable newVar = context.newVar();
             groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
                     new MutableObject<ILogicalExpression>(new VariableReferenceExpression(liveVar))));
@@ -399,7 +404,7 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
             replacedVarMap.put(liveVar, newVar);
         }
         for (LogicalVariable liveVar : inputLiveVars) {
-            if (fdCoveringVars.contains(liveVar)) {
+            if (primaryKeyVars.contains(liveVar)) {
                 continue;
             }
             groupByDecorList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(null,
@@ -458,7 +463,7 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
     private Pair<Boolean, Map<LogicalVariable, LogicalVariable>> applySpecialFlattening(Mutable<ILogicalOperator> opRef,
             IOptimizationContext context) throws AlgebricksException {
         SubplanOperator subplanOp = (SubplanOperator) opRef.getValue();
-        ILogicalOperator inputOp = subplanOp.getInputs().get(0).getValue();
+        Mutable<ILogicalOperator> inputOpRef = subplanOp.getInputs().get(0);
         Map<LogicalVariable, LogicalVariable> replacedVarMap = new HashMap<>();
 
         // Recursively applies this rule to the nested plan of the subplan operator,
@@ -466,27 +471,33 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
         Pair<Boolean, Map<LogicalVariable, LogicalVariable>> result = rewriteSubplanOperator(
                 subplanOp.getNestedPlans().get(0).getRoots().get(0), context);
 
+        ILogicalOperator inputOpBackup = inputOpRef.getValue();
+        // Gets live variables and covering variables from the subplan's input operator.
+        Pair<ILogicalOperator, Set<LogicalVariable>> primaryOpAndVars = EquivalenceClassUtils
+                .findOrCreatePrimaryKeyOpAndVariables(inputOpBackup, false, context);
+        ILogicalOperator inputOp = primaryOpAndVars.first;
+        Set<LogicalVariable> primaryKeyVars = primaryOpAndVars.second;
+        inputOpRef.setValue(inputOp);
+        Set<LogicalVariable> liveVars = new HashSet<>();
+        VariableUtilities.getLiveVariables(inputOp, liveVars);
+
         Pair<Set<LogicalVariable>, Mutable<ILogicalOperator>> notNullVarsAndTopJoinRef = SubplanFlatteningUtil
                 .inlineLeftNtsInSubplanJoin(subplanOp, context);
         if (notNullVarsAndTopJoinRef.first == null) {
+            inputOpRef.setValue(inputOpBackup);
             return new Pair<Boolean, Map<LogicalVariable, LogicalVariable>>(false, replacedVarMap);
         }
 
         Set<LogicalVariable> notNullVars = notNullVarsAndTopJoinRef.first;
         Mutable<ILogicalOperator> topJoinRef = notNullVarsAndTopJoinRef.second;
 
-        // Gets live variables and covering variables from the subplan's input operator.
-        Set<LogicalVariable> fdCoveringVars = EquivalenceClassUtils.findFDHeaderVariables(context, inputOp);
-        Set<LogicalVariable> liveVars = new HashSet<>();
-        VariableUtilities.getLiveVariables(inputOp, liveVars);
-
         // Creates a group-by operator.
         List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
         List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> groupByDecorList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
         GroupByOperator groupbyOp = new GroupByOperator(groupByList, groupByDecorList, subplanOp.getNestedPlans());
 
         Map<LogicalVariable, LogicalVariable> gbyVarMap = new HashMap<LogicalVariable, LogicalVariable>();
-        for (LogicalVariable coverVar : fdCoveringVars) {
+        for (LogicalVariable coverVar : primaryKeyVars) {
             LogicalVariable newVar = context.newVar();
             gbyVarMap.put(coverVar, newVar);
             groupByList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
@@ -495,7 +506,7 @@ public class InlineSubplanInputForNestedTupleSourceRule implements IAlgebraicRew
             replacedVarMap.put(coverVar, newVar);
         }
         for (LogicalVariable liveVar : liveVars) {
-            if (fdCoveringVars.contains(liveVar)) {
+            if (primaryKeyVars.contains(liveVar)) {
                 continue;
             }
             groupByDecorList.add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(null,

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
----------------------------------------------------------------------
diff --git a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
index 242e15b..28f1fca 100644
--- a/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
+++ b/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/util/EquivalenceClassUtils.java
@@ -19,6 +19,8 @@
 
 package org.apache.asterix.optimizer.rules.util;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -33,20 +35,20 @@ import org.apache.asterix.om.base.AInt32;
 import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
 import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.expressions.StatefulFunctionCallExpression;
 import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.PrimaryKeyVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
@@ -133,48 +135,60 @@ public class EquivalenceClassUtils {
     /**
      * Find the header variables that can imply all subplan-local live variables at <code>operator</code>.
      *
-     * @param context
-     *            the optimization context.
      * @param operator
      *            the operator of interest.
-     * @return a set of covering variables that can imply all subplan-local live variables at <code>operator</code>.
+     * @param usedForCorrelationJoin
+     *            whether the generated primary key will be used for a join that recovers the correlation.
+     * @param context
+     *            the optimization context.
+     * @return Pair<ILogicalOperator, Set<LogicalVariable>>, an operator (which is either the original parameter
+     *         <code>operator</code> or a newly created operator) and
+     *         a set of primary key variables at the operator.
      * @throws AlgebricksException
      */
-    public static Set<LogicalVariable> findFDHeaderVariables(IOptimizationContext context, ILogicalOperator operator)
-            throws AlgebricksException {
-        PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses((AbstractLogicalOperator) operator, context);
-        List<FunctionalDependency> fds = context.getFDList(operator);
-        context.clearAllFDAndEquivalenceClasses();
+    public static Pair<ILogicalOperator, Set<LogicalVariable>> findOrCreatePrimaryKeyOpAndVariables(
+            ILogicalOperator operator, boolean usedForCorrelationJoin, IOptimizationContext context)
+                    throws AlgebricksException {
+        computePrimaryKeys(operator, context);
 
         Set<LogicalVariable> liveVars = new HashSet<>();
         VariableUtilities.getSubplanLocalLiveVariables(operator, liveVars);
 
-        Set<LogicalVariable> key = new HashSet<>();
-        Set<LogicalVariable> cover = new HashSet<>();
-        for (FunctionalDependency fd : fds) {
-            List<LogicalVariable> head = fd.getHead();
-            head.retainAll(liveVars);
-            key.addAll(head);
-            cover.addAll(fd.getTail());
-            if (cover.containsAll(liveVars)) {
-                return key;
+        Set<LogicalVariable> primaryKeyVars = new HashSet<>();
+        Set<LogicalVariable> noKeyVars = new HashSet<>();
+        for (LogicalVariable liveVar : liveVars) {
+            List<LogicalVariable> keyVars = context.findPrimaryKey(liveVar);
+            if (keyVars != null) {
+                keyVars.retainAll(liveVars);
+            }
+            if ((keyVars == null || keyVars.isEmpty())) {
+                noKeyVars.add(liveVar);
+            } else {
+                primaryKeyVars.addAll(keyVars);
             }
         }
-        if (cover.containsAll(liveVars)) {
-            return key;
+        primaryKeyVars.retainAll(liveVars);
+        if (primaryKeyVars.containsAll(noKeyVars)) {
+            return new Pair<ILogicalOperator, Set<LogicalVariable>>(operator, primaryKeyVars);
         } else {
-            IVariableTypeEnvironment env = context.getOutputTypeEnvironment(operator);
-            Set<LogicalVariable> keyVars = new HashSet<>();
-            for (LogicalVariable var : liveVars) {
-                IAType type = (IAType) env.getVarType(var);
-                ATypeTag typeTag = type.getTypeTag();
-                if (typeTag == ATypeTag.RECORD || typeTag == ATypeTag.ORDEREDLIST
-                        || typeTag == ATypeTag.UNORDEREDLIST) {
-                    continue;
-                }
-                keyVars.add(var);
-            }
-            return keyVars;
+            LogicalVariable assignVar = context.newVar();
+            ILogicalOperator assignOp = new AssignOperator(assignVar,
+                    new MutableObject<ILogicalExpression>(usedForCorrelationJoin
+                            ? new StatefulFunctionCallExpression(
+                                    FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CREATE_QUERY_UID), null)
+                            : new ScalarFunctionCallExpression(
+                                    FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.CREATE_QUERY_UID))));
+            assignOp.getInputs().add(new MutableObject<ILogicalOperator>(operator));
+            context.addPrimaryKey(new FunctionalDependency(Collections.singletonList(assignVar),
+                    new ArrayList<LogicalVariable>(liveVars)));
+            context.computeAndSetTypeEnvironmentForOperator(assignOp);
+            return new Pair<ILogicalOperator, Set<LogicalVariable>>(assignOp, Collections.singleton(assignVar));
         }
     }
+
+    private static void computePrimaryKeys(ILogicalOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        PrimaryKeyVariablesVisitor visitor = new PrimaryKeyVariablesVisitor();
+        PhysicalOptimizationsUtil.visitOperatorAndItsDescendants(op, visitor, ctx);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1168.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1168.aql b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1168.aql
new file mode 100644
index 0000000..4df51c6
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/queries/query-ASTERIXDB-1168.aql
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+drop dataverse test if exists;
+create dataverse test;
+use dataverse test;
+
+create type TType as closed
+{ id: int64, content: string }
+
+create dataset TData (TType) primary key id;
+
+
+let $ps := ["b","a", "b","c","c"]
+for $p in $ps
+return
+{ "p":$p, "match": for $x in dataset TData where $x.content = $p return $x.id }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-app/src/test/resources/optimizerts/queries/udfs/query-ASTERIXDB-1020.aql
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/optimizerts/queries/udfs/query-ASTERIXDB-1020.aql b/asterix-app/src/test/resources/optimizerts/queries/udfs/query-ASTERIXDB-1020.aql
index 17e50df..c980445 100644
--- a/asterix-app/src/test/resources/optimizerts/queries/udfs/query-ASTERIXDB-1020.aql
+++ b/asterix-app/src/test/resources/optimizerts/queries/udfs/query-ASTERIXDB-1020.aql
@@ -50,7 +50,7 @@ for $emergency in dataset CHPReports
     let $dangerzone := create-circle($emergency.epicenter,$emergency.radius)
     where (some $user in dataset userLocations satisfies
     $user.user-id = $userid and spatial-intersect($dangerzone,$user.location))
-return { "shelter locations":"for $shelter in dataset tornadoShelters return $shelter.location"}
+return { "shelter locations": for $shelter in dataset tornadoShelters return $shelter.location}
 };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan b/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan
index e40816f..654dcf1 100644
--- a/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/external-indexing/leftouterjoin-probe-sidx-with-join-btree-sidx_01.plan
@@ -2,40 +2,44 @@
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
-        -- SORT_MERGE_EXCHANGE [$$27(ASC), $$28(ASC) ]  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$19, $$21]  |PARTITIONED|
-                  {
-                    -- AGGREGATE  |LOCAL|
-                      -- STREAM_SELECT  |LOCAL|
-                        -- NESTED_TUPLE_SOURCE  |LOCAL|
-                  }
+        -- SORT_MERGE_EXCHANGE [$$19(ASC) ]  |PARTITIONED|
+          -- STABLE_SORT [$$19(ASC)]  |PARTITIONED|
             -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-              -- STABLE_SORT [$$19(ASC), $$21(ASC), $$22(ASC)]  |PARTITIONED|
-                -- HASH_PARTITION_EXCHANGE [$$19, $$21]  |PARTITIONED|
-                  -- STREAM_SELECT  |PARTITIONED|
-                    -- ASSIGN  |PARTITIONED|
-                      -- STREAM_PROJECT  |PARTITIONED|
-                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                          -- EXTERNAL_LOOKUP  |PARTITIONED|
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- STABLE_SORT [$$34(ASC), $$35(ASC)]  |PARTITIONED|
+              -- STREAM_PROJECT  |PARTITIONED|
+                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                  -- PRE_CLUSTERED_GROUP_BY[$$26]  |PARTITIONED|
+                          {
+                            -- AGGREGATE  |LOCAL|
+                              -- STREAM_SELECT  |LOCAL|
+                                -- NESTED_TUPLE_SOURCE  |LOCAL|
+                          }
+                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                      -- STABLE_SORT [$$26(ASC), $$22(ASC)]  |PARTITIONED|
+                        -- HASH_PARTITION_EXCHANGE [$$26]  |PARTITIONED|
+                          -- STREAM_SELECT  |PARTITIONED|
+                            -- ASSIGN  |PARTITIONED|
+                              -- STREAM_PROJECT  |PARTITIONED|
                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
+                                  -- EXTERNAL_LOOKUP  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- BTREE_SEARCH  |PARTITIONED|
-                                        -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                      -- STABLE_SORT [$$34(ASC), $$35(ASC)]  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                           -- STREAM_PROJECT  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_SELECT  |PARTITIONED|
-                                                -- ASSIGN  |PARTITIONED|
-                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                    -- EXTERNAL_LOOKUP  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- STABLE_SORT [$$31(ASC), $$32(ASC)]  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- BTREE_SEARCH  |PARTITIONED|
+                                                -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                    -- ASSIGN  |PARTITIONED|
+                                                      -- STREAM_SELECT  |PARTITIONED|
+                                                        -- ASSIGN  |PARTITIONED|
                                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                            -- EXTERNAL_LOOKUP  |PARTITIONED|
                                                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- BTREE_SEARCH  |PARTITIONED|
+                                                                -- STABLE_SORT [$$31(ASC), $$32(ASC)]  |PARTITIONED|
                                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- ASSIGN  |PARTITIONED|
-                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- BTREE_SEARCH  |PARTITIONED|
+                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                            -- ASSIGN  |PARTITIONED|
+                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1168.plan
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1168.plan b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1168.plan
new file mode 100644
index 0000000..b4d6934
--- /dev/null
+++ b/asterix-app/src/test/resources/optimizerts/results/query-ASTERIXDB-1168.plan
@@ -0,0 +1,31 @@
+-- DISTRIBUTE_RESULT  |PARTITIONED|
+  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+    -- STREAM_PROJECT  |PARTITIONED|
+      -- ASSIGN  |PARTITIONED|
+        -- STREAM_PROJECT  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+            -- PRE_CLUSTERED_GROUP_BY[$$12]  |PARTITIONED|
+                    {
+                      -- AGGREGATE  |LOCAL|
+                        -- STREAM_SELECT  |LOCAL|
+                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                    }
+              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                -- STABLE_SORT [$$12(ASC)]  |PARTITIONED|
+                  -- HASH_PARTITION_EXCHANGE [$$12]  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- HYBRID_HASH_JOIN [$$1][$$11]  |PARTITIONED|
+                          -- HASH_PARTITION_EXCHANGE [$$1]  |PARTITIONED|
+                            -- ASSIGN  |UNPARTITIONED|
+                              -- STREAM_PROJECT  |UNPARTITIONED|
+                                -- UNNEST  |UNPARTITIONED|
+                                  -- ASSIGN  |UNPARTITIONED|
+                                    -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                          -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
+                            -- STREAM_PROJECT  |PARTITIONED|
+                              -- ASSIGN  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan b/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
index 28625aa..59cfd60 100644
--- a/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/query-issue562.plan
@@ -3,66 +3,61 @@
     -- STREAM_PROJECT  |PARTITIONED|
       -- ASSIGN  |PARTITIONED|
         -- SORT_MERGE_EXCHANGE [$$7(ASC) ]  |PARTITIONED|
-          -- PRE_CLUSTERED_GROUP_BY[$$86]  |PARTITIONED|
+          -- PRE_CLUSTERED_GROUP_BY[$$83]  |PARTITIONED|
                   {
                     -- AGGREGATE  |LOCAL|
                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                   }
-            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$86(ASC)] HASH:[$$86]  |PARTITIONED|
-              -- SORT_GROUP_BY[$$59]  |PARTITIONED|
+            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$83(ASC)] HASH:[$$83]  |PARTITIONED|
+              -- SORT_GROUP_BY[$$11]  |PARTITIONED|
                       {
                         -- AGGREGATE  |LOCAL|
                           -- NESTED_TUPLE_SOURCE  |LOCAL|
                       }
                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                   -- STREAM_PROJECT  |PARTITIONED|
-                    -- ASSIGN  |PARTITIONED|
+                    -- STREAM_SELECT  |PARTITIONED|
                       -- STREAM_PROJECT  |PARTITIONED|
-                        -- STREAM_SELECT  |PARTITIONED|
-                          -- STREAM_PROJECT  |PARTITIONED|
-                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                              -- SORT_GROUP_BY[$$83]  |PARTITIONED|
+                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                          -- PRE_CLUSTERED_GROUP_BY[$$80]  |PARTITIONED|
+                                  {
+                                    -- AGGREGATE  |LOCAL|
+                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                  }
+                            -- HASH_PARTITION_MERGE_EXCHANGE MERGE:[$$80(ASC)] HASH:[$$80]  |PARTITIONED|
+                              -- PRE_CLUSTERED_GROUP_BY[$$75]  |PARTITIONED|
                                       {
                                         -- AGGREGATE  |LOCAL|
-                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                          -- STREAM_SELECT  |LOCAL|
+                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
                                       }
-                                -- HASH_PARTITION_EXCHANGE [$$83]  |PARTITIONED|
-                                  -- STREAM_PROJECT  |PARTITIONED|
+                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                  -- STABLE_SORT [$$75(ASC)]  |PARTITIONED|
                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                      -- PRE_CLUSTERED_GROUP_BY[$$65, $$11, $$62]  |PARTITIONED|
-                                              {
-                                                -- AGGREGATE  |LOCAL|
-                                                  -- STREAM_SELECT  |LOCAL|
-                                                    -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                              }
+                                      -- STREAM_PROJECT  |PARTITIONED|
                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                          -- STABLE_SORT [$$65(ASC), $$65(ASC), $$62(ASC)]  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- STABLE_SORT [$$65(ASC), $$65(ASC), $$62(ASC)]  |PARTITIONED|
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                      -- HYBRID_HASH_JOIN [$$68][$$69]  |PARTITIONED|
-                                                        -- HASH_PARTITION_EXCHANGE [$$68]  |PARTITIONED|
-                                                          -- ASSIGN  |PARTITIONED|
-                                                            -- ASSIGN  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- HYBRID_HASH_JOIN [$$65][$$11]  |PARTITIONED|
-                                                                  -- HASH_PARTITION_EXCHANGE [$$65]  |PARTITIONED|
-                                                                    -- UNNEST  |UNPARTITIONED|
-                                                                      -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
-                                                                  -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
-                                                                    -- ASSIGN  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ASSIGN  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                        -- HASH_PARTITION_EXCHANGE [$$69]  |PARTITIONED|
-                                                          -- ASSIGN  |PARTITIONED|
+                                          -- HYBRID_HASH_JOIN [$$68][$$69]  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$68]  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ASSIGN  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- HYBRID_HASH_JOIN [$$65][$$11]  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$65]  |PARTITIONED|
+                                                            -- UNNEST  |UNPARTITIONED|
+                                                              -- EMPTY_TUPLE_SOURCE  |UNPARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$11]  |PARTITIONED|
                                                             -- STREAM_PROJECT  |PARTITIONED|
-                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                  -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                            -- HASH_PARTITION_EXCHANGE [$$69]  |PARTITIONED|
+                                              -- ASSIGN  |PARTITIONED|
+                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1018.plan
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1018.plan b/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1018.plan
index 1e7e505..3b7f231 100644
--- a/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1018.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1018.plan
@@ -4,71 +4,69 @@
       -- ASSIGN  |PARTITIONED|
         -- STREAM_PROJECT  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-            -- HYBRID_HASH_JOIN [$$34][$$50]  |PARTITIONED|
+            -- HYBRID_HASH_JOIN [$$34][$$47]  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                     -- DATASOURCE_SCAN  |PARTITIONED|
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                         -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-              -- HASH_PARTITION_EXCHANGE [$$50]  |PARTITIONED|
+              -- HASH_PARTITION_EXCHANGE [$$47]  |PARTITIONED|
                 -- STREAM_PROJECT  |PARTITIONED|
                   -- ASSIGN  |PARTITIONED|
                     -- STREAM_PROJECT  |PARTITIONED|
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- PRE_CLUSTERED_GROUP_BY[$$55, $$24, $$54]  |PARTITIONED|
+                        -- PRE_CLUSTERED_GROUP_BY[$$51, $$24, $$52]  |PARTITIONED|
                                 {
                                   -- AGGREGATE  |LOCAL|
                                     -- STREAM_SELECT  |LOCAL|
                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                 }
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- STABLE_SORT [$$55(ASC), $$24(ASC), $$54(ASC)]  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$55, $$24, $$54]  |PARTITIONED|
+                            -- STABLE_SORT [$$51(ASC), $$24(ASC), $$52(ASC)]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$51, $$24, $$52]  |PARTITIONED|
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- NESTED_LOOP  |PARTITIONED|
                                       -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- STREAM_SELECT  |PARTITIONED|
-                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                              -- PRE_CLUSTERED_GROUP_BY[$$49, $$35]  |PARTITIONED|
-                                                      {
-                                                        -- AGGREGATE  |LOCAL|
-                                                          -- STREAM_SELECT  |LOCAL|
-                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                      }
-                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                  -- STABLE_SORT [$$49(ASC), $$35(ASC)]  |PARTITIONED|
-                                                    -- HASH_PARTITION_EXCHANGE [$$49, $$35]  |PARTITIONED|
-                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                          -- HYBRID_HASH_JOIN [$$47][$$38]  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$47]  |PARTITIONED|
-                                                              -- NESTED_LOOP  |PARTITIONED|
-                                                                -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                    -- ASSIGN  |PARTITIONED|
-                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                        -- STREAM_SELECT  |PARTITIONED|
+                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                            -- PRE_CLUSTERED_GROUP_BY[$$35, $$46]  |PARTITIONED|
+                                                    {
+                                                      -- AGGREGATE  |LOCAL|
+                                                        -- STREAM_SELECT  |LOCAL|
+                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                    }
+                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                -- STABLE_SORT [$$35(ASC), $$46(ASC)]  |PARTITIONED|
+                                                  -- HASH_PARTITION_EXCHANGE [$$35, $$46]  |PARTITIONED|
+                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- HYBRID_HASH_JOIN [$$44][$$38]  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$44]  |PARTITIONED|
+                                                            -- NESTED_LOOP  |PARTITIONED|
+                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
                                                                   -- ASSIGN  |PARTITIONED|
-                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                      -- ASSIGN  |PARTITIONED|
+                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                            -- HASH_PARTITION_EXCHANGE [$$38]  |PARTITIONED|
-                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                -- ASSIGN  |PARTITIONED|
-                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ASSIGN  |PARTITIONED|
                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                       -- DATASOURCE_SCAN  |PARTITIONED|
                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                           -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                          -- HASH_PARTITION_EXCHANGE [$$38]  |PARTITIONED|
+                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                              -- ASSIGN  |PARTITIONED|
+                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                         -- STREAM_PROJECT  |PARTITIONED|
                                           -- ASSIGN  |PARTITIONED|

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1019.plan
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1019.plan b/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1019.plan
index 3bf96d9..db30fa1 100644
--- a/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1019.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1019.plan
@@ -12,71 +12,69 @@
                       -- ASSIGN  |PARTITIONED|
                         -- STREAM_PROJECT  |PARTITIONED|
                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- HYBRID_HASH_JOIN [$$38][$$54]  |PARTITIONED|
+                            -- HYBRID_HASH_JOIN [$$38][$$51]  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                     -- DATASOURCE_SCAN  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                         -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$54]  |PARTITIONED|
+                              -- HASH_PARTITION_EXCHANGE [$$51]  |PARTITIONED|
                                 -- STREAM_PROJECT  |PARTITIONED|
                                   -- ASSIGN  |PARTITIONED|
                                     -- STREAM_PROJECT  |PARTITIONED|
                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- PRE_CLUSTERED_GROUP_BY[$$59, $$24, $$58]  |PARTITIONED|
+                                        -- PRE_CLUSTERED_GROUP_BY[$$56, $$24, $$55]  |PARTITIONED|
                                                 {
                                                   -- AGGREGATE  |LOCAL|
                                                     -- STREAM_SELECT  |LOCAL|
                                                       -- NESTED_TUPLE_SOURCE  |LOCAL|
                                                 }
                                           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- STABLE_SORT [$$59(ASC), $$24(ASC), $$58(ASC)]  |PARTITIONED|
-                                              -- HASH_PARTITION_EXCHANGE [$$59, $$24, $$58]  |PARTITIONED|
+                                            -- STABLE_SORT [$$56(ASC), $$24(ASC), $$55(ASC)]  |PARTITIONED|
+                                              -- HASH_PARTITION_EXCHANGE [$$56, $$24, $$55]  |PARTITIONED|
                                                 -- STREAM_PROJECT  |PARTITIONED|
                                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                     -- NESTED_LOOP  |PARTITIONED|
                                                       -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                        -- STREAM_PROJECT  |PARTITIONED|
-                                                          -- STREAM_SELECT  |PARTITIONED|
-                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                              -- PRE_CLUSTERED_GROUP_BY[$$53, $$39]  |PARTITIONED|
-                                                                      {
-                                                                        -- AGGREGATE  |LOCAL|
-                                                                          -- STREAM_SELECT  |LOCAL|
-                                                                            -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                                                      }
-                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                  -- STABLE_SORT [$$53(ASC), $$39(ASC)]  |PARTITIONED|
-                                                                    -- HASH_PARTITION_EXCHANGE [$$53, $$39]  |PARTITIONED|
-                                                                      -- STREAM_PROJECT  |PARTITIONED|
-                                                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                          -- HYBRID_HASH_JOIN [$$51][$$42]  |PARTITIONED|
-                                                                            -- HASH_PARTITION_EXCHANGE [$$51]  |PARTITIONED|
-                                                                              -- NESTED_LOOP  |PARTITIONED|
-                                                                                -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
-                                                                                    -- ASSIGN  |PARTITIONED|
-                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                                -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                        -- STREAM_SELECT  |PARTITIONED|
+                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                            -- PRE_CLUSTERED_GROUP_BY[$$50, $$39]  |PARTITIONED|
+                                                                    {
+                                                                      -- AGGREGATE  |LOCAL|
+                                                                        -- STREAM_SELECT  |LOCAL|
+                                                                          -- NESTED_TUPLE_SOURCE  |LOCAL|
+                                                                    }
+                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                -- STABLE_SORT [$$50(ASC), $$39(ASC)]  |PARTITIONED|
+                                                                  -- HASH_PARTITION_EXCHANGE [$$50, $$39]  |PARTITIONED|
+                                                                    -- STREAM_PROJECT  |PARTITIONED|
+                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                        -- HYBRID_HASH_JOIN [$$48][$$42]  |PARTITIONED|
+                                                                          -- HASH_PARTITION_EXCHANGE [$$48]  |PARTITIONED|
+                                                                            -- NESTED_LOOP  |PARTITIONED|
+                                                                              -- BROADCAST_EXCHANGE  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
                                                                                   -- ASSIGN  |PARTITIONED|
-                                                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                                                      -- ASSIGN  |PARTITIONED|
+                                                                                    -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                      -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                          -- DATASOURCE_SCAN  |PARTITIONED|
-                                                                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                                                            -- HASH_PARTITION_EXCHANGE [$$42]  |PARTITIONED|
-                                                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                                                -- ASSIGN  |PARTITIONED|
-                                                                                  -- STREAM_PROJECT  |PARTITIONED|
+                                                                                          -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ASSIGN  |PARTITIONED|
                                                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                       -- DATASOURCE_SCAN  |PARTITIONED|
                                                                                         -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                                                           -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                                                          -- HASH_PARTITION_EXCHANGE [$$42]  |PARTITIONED|
+                                                                            -- STREAM_PROJECT  |PARTITIONED|
+                                                                              -- ASSIGN  |PARTITIONED|
+                                                                                -- STREAM_PROJECT  |PARTITIONED|
+                                                                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                    -- DATASOURCE_SCAN  |PARTITIONED|
+                                                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                                                                        -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                         -- STREAM_PROJECT  |PARTITIONED|
                                                           -- ASSIGN  |PARTITIONED|

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1020.plan
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1020.plan b/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1020.plan
index 7cf5bc4..ef95877 100644
--- a/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1020.plan
+++ b/asterix-app/src/test/resources/optimizerts/results/udfs/query-ASTERIXDB-1020.plan
@@ -1,63 +1,58 @@
 -- DISTRIBUTE_RESULT  |PARTITIONED|
   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-    -- STREAM_PROJECT  |PARTITIONED|
-      -- ASSIGN  |PARTITIONED|
+    -- NESTED_LOOP  |PARTITIONED|
+      -- BROADCAST_EXCHANGE  |PARTITIONED|
         -- STREAM_PROJECT  |PARTITIONED|
           -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-            -- HYBRID_HASH_JOIN [$$26][$$38]  |PARTITIONED|
+            -- DATASOURCE_SCAN  |PARTITIONED|
               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                -- ASSIGN  |PARTITIONED|
-                  -- STREAM_PROJECT  |PARTITIONED|
-                    -- ASSIGN  |PARTITIONED|
+                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+        -- NESTED_LOOP  |PARTITIONED|
+          -- BROADCAST_EXCHANGE  |PARTITIONED|
+            -- STREAM_PROJECT  |PARTITIONED|
+              -- STREAM_SELECT  |PARTITIONED|
+                -- STREAM_PROJECT  |PARTITIONED|
+                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                    -- PRE_CLUSTERED_GROUP_BY[$$32]  |PARTITIONED|
+                            {
+                              -- AGGREGATE  |LOCAL|
+                                -- STREAM_SELECT  |LOCAL|
+                                  -- NESTED_TUPLE_SOURCE  |LOCAL|
+                            }
                       -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- SPLIT  |PARTITIONED|
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                        -- STABLE_SORT [$$32(ASC)]  |PARTITIONED|
+                          -- HASH_PARTITION_EXCHANGE [$$32]  |PARTITIONED|
                             -- STREAM_PROJECT  |PARTITIONED|
                               -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                -- DATASOURCE_SCAN  |PARTITIONED|
-                                  -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-              -- HASH_PARTITION_EXCHANGE [$$38]  |PARTITIONED|
-                -- STREAM_PROJECT  |PARTITIONED|
-                  -- STREAM_SELECT  |PARTITIONED|
-                    -- STREAM_PROJECT  |PARTITIONED|
-                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                        -- PRE_CLUSTERED_GROUP_BY[$$27, $$36]  |PARTITIONED|
-                                {
-                                  -- AGGREGATE  |LOCAL|
-                                    -- STREAM_SELECT  |LOCAL|
-                                      -- NESTED_TUPLE_SOURCE  |LOCAL|
-                                }
-                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                            -- STABLE_SORT [$$27(ASC), $$36(ASC)]  |PARTITIONED|
-                              -- HASH_PARTITION_EXCHANGE [$$27, $$36]  |PARTITIONED|
                                 -- NESTED_LOOP  |PARTITIONED|
                                   -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                    -- SPLIT  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- STREAM_PROJECT  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- DATASOURCE_SCAN  |PARTITIONED|
-                                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- ASSIGN  |PARTITIONED|
+                                        -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                          -- DATASOURCE_SCAN  |PARTITIONED|
+                                            -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                              -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
                                   -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                    -- STREAM_PROJECT  |PARTITIONED|
-                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                        -- NESTED_LOOP  |PARTITIONED|
-                                          -- BROADCAST_EXCHANGE  |PARTITIONED|
-                                            -- STREAM_PROJECT  |PARTITIONED|
-                                              -- ASSIGN  |PARTITIONED|
+                                    -- ASSIGN  |PARTITIONED|
+                                      -- STREAM_PROJECT  |PARTITIONED|
+                                        -- STREAM_SELECT  |PARTITIONED|
+                                          -- STREAM_PROJECT  |PARTITIONED|
+                                            -- ASSIGN  |PARTITIONED|
+                                              -- STREAM_PROJECT  |PARTITIONED|
                                                 -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                   -- DATASOURCE_SCAN  |PARTITIONED|
                                                     -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
                                                       -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
-                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                            -- ASSIGN  |PARTITIONED|
-                                              -- STREAM_PROJECT  |PARTITIONED|
-                                                -- STREAM_SELECT  |PARTITIONED|
-                                                  -- ASSIGN  |PARTITIONED|
-                                                    -- STREAM_PROJECT  |PARTITIONED|
-                                                      -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                        -- DATASOURCE_SCAN  |PARTITIONED|
-                                                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
-                                                            -- EMPTY_TUPLE_SOURCE  |PARTITIONED|
+          -- ONE_TO_ONE_EXCHANGE  |UNPARTITIONED|
+            -- STREAM_PROJECT  |UNPARTITIONED|
+              -- ASSIGN  |UNPARTITIONED|
+                -- AGGREGATE  |UNPARTITIONED|
+                  -- RANDOM_MERGE_EXCHANGE  |PARTITIONED|
+                    -- STREAM_PROJECT  |PARTITIONED|
+                      -- ASSIGN  |PARTITIONED|
+                        -- STREAM_PROJECT  |PARTITIONED|
+                          -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                            -- DATASOURCE_SCAN  |PARTITIONED|
+                              -- ONE_TO_ONE_EXCHANGE  |PARTITIONED|
+                                -- EMPTY_TUPLE_SOURCE  |PARTITIONED|

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id/decorrelate_with_unique_id.1.ddl.sqlpp
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id/decorrelate_with_unique_id.1.ddl.sqlpp b/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id/decorrelate_with_unique_id.1.ddl.sqlpp
new file mode 100644
index 0000000..2a24740
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id/decorrelate_with_unique_id.1.ddl.sqlpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+DROP DATABASE test IF EXISTS;
+CREATE DATABASE test;
+USE test;
+
+
+CREATE TYPE OrderType AS CLOSED {
+  o_orderkey: int32,
+  o_custkey: int32,
+  o_orderstatus: string,
+  o_totalprice: double,
+  o_orderdate: string,
+  o_orderpriority: string,
+  o_clerk: string,
+  o_shippriority: int32,
+  o_comment: string
+}
+
+CREATE TYPE CustomerType AS CLOSED {
+  c_custkey: int32,
+  c_name: string,
+  c_address: string,
+  c_nationkey: int32,
+  c_phone: string,
+  c_acctbal: double,
+  c_mktsegment: string,
+  c_comment: string
+}
+
+
+CREATE EXTERNAL TABLE Customers(CustomerType) USING "localfs"
+(("path"="asterix_nc1://data/tpch0.001/customer.tbl"),
+("input-format"="text-input-format"),("format"="delimited-text"),("delimiter"="|"));
+
+CREATE EXTERNAL TABLE Orders(OrderType) USING "localfs"
+(("path"="asterix_nc1://data/tpch0.001/orders.tbl"),
+("input-format"="text-input-format"),("format"="delimited-text"),("delimiter"="|"));

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ae555bab/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id/decorrelate_with_unique_id.2.update.sqlpp
----------------------------------------------------------------------
diff --git a/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id/decorrelate_with_unique_id.2.update.sqlpp b/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id/decorrelate_with_unique_id.2.update.sqlpp
new file mode 100644
index 0000000..7220975
--- /dev/null
+++ b/asterix-app/src/test/resources/runtimets/queries_sqlpp/subquery/decorrelate_with_unique_id/decorrelate_with_unique_id.2.update.sqlpp
@@ -0,0 +1,18 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
\ No newline at end of file