You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2016/01/28 05:46:02 UTC

[2/2] incubator-asterixdb-hyracks git commit: ASTERIXDB-1005, ASTERIXDB-1263: clean up subplan flattening, including: 1. Fixed the data property progation in HashJoin, NestedLoopJoin, PreClusteredGroupBy, and BroadcastExchange; 2. Fixed race conditions i

ASTERIXDB-1005, ASTERIXDB-1263: clean up subplan flattening, including:
1. Fixed the data property progation in HashJoin, NestedLoopJoin, PreClusteredGroupBy, and BroadcastExchange;
2. Fixed race conditions in SplitOperatorDescriptor;
3. Added a top-down pass for JobBuilder to set location constraints;
4. Fixed AbstractIntroduceGroupByCombinerRule for general cases.

Change-Id: I0197dc879cf983577e63ea5c047144966c0f7a3c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/572
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Till Westmann <ti...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/0b642674
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/0b642674
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/0b642674

Branch: refs/heads/master
Commit: 0b642674a4a72b2478028abd11ed03f38322a619
Parents: 67df1b1
Author: Yingyi Bu <yi...@couchbase.com>
Authored: Wed Jan 27 19:22:37 2016 -0800
Committer: Yingyi Bu <bu...@gmail.com>
Committed: Wed Jan 27 20:41:33 2016 -0800

----------------------------------------------------------------------
 .../core/algebra/base/IOptimizationContext.java |   7 +-
 .../visitors/EnforceVariablesVisitor.java       | 391 -------------------
 .../logical/visitors/IsomorphismUtilities.java  |  11 +-
 ...pressionDeepCopyWithNewVariablesVisitor.java |   3 +
 ...OperatorDeepCopyWithNewVariablesVisitor.java |   2 +-
 .../visitors/OperatorDeepCopyVisitor.java       |   2 +-
 .../visitors/SubstituteVariableVisitor.java     |   4 +-
 .../logical/visitors/UsedVariableVisitor.java   |   4 +-
 .../logical/visitors/VariableUtilities.java     |  53 ++-
 .../physical/AbstractHashJoinPOperator.java     |  16 +-
 .../AbstractPreclusteredGroupByPOperator.java   |  22 +-
 .../operators/physical/BroadcastPOperator.java  |   9 +-
 .../physical/HybridHashJoinPOperator.java       |  56 ++-
 .../operators/physical/NLJoinPOperator.java     |  14 +-
 .../physical/RandomPartitionPOperator.java      |  10 +-
 .../algebra/util/OperatorManipulationUtil.java  |  29 ++
 .../algebricks/core/jobgen/impl/JobBuilder.java |  55 ++-
 .../base/AlgebricksOptimizationContext.java     |  27 +-
 .../visitors/EnforceVariablesVisitorTest.java   | 190 ---------
 .../AbstractIntroduceGroupByCombinerRule.java   | 126 ++++--
 .../rules/EnforceStructuralPropertiesRule.java  |  72 ++--
 .../rules/ExtractCommonOperatorsRule.java       |  13 +-
 .../rewriter/rules/PushProjectDownRule.java     |  23 +-
 .../IntroduceLeftOuterJoinForSubplanRule.java   |   5 +-
 .../RandomPartitionComputerFactory.java         |  32 +-
 .../MToNReplicatingConnectorDescriptor.java     |   7 +-
 .../std/misc/MaterializerTaskState.java         |  18 +-
 .../std/misc/SplitOperatorDescriptor.java       |  64 ++-
 28 files changed, 436 insertions(+), 829 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index a2ed2ae..813e58f 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -41,6 +41,7 @@ public interface IOptimizationContext extends ITypingContext {
 
     public abstract LogicalVariable newVar();
 
+    @Override
     public abstract IMetadataProvider<?, ?> getMetadataProvider();
 
     public abstract void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider);
@@ -53,7 +54,7 @@ public interface IOptimizationContext extends ITypingContext {
      * returns true if op1 and op2 have already been compared
      */
     public abstract boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2);
-    
+
     public abstract void removeFromAlreadyCompared(ILogicalOperator op1);
 
     public abstract void addNotToBeInlinedVar(LogicalVariable var);
@@ -72,6 +73,8 @@ public interface IOptimizationContext extends ITypingContext {
 
     public abstract List<FunctionalDependency> getFDList(ILogicalOperator op);
 
+    public void clearAllFDAndEquivalenceClasses();
+
     public abstract void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v);
 
     public abstract ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op);
@@ -89,6 +92,6 @@ public interface IOptimizationContext extends ITypingContext {
     public abstract void computeAndSetTypeEnvironmentForOperator(ILogicalOperator op) throws AlgebricksException;
 
     public abstract void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars);
-    
+
     public abstract LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor();
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java
deleted file mode 100644
index d8089bd..0000000
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitor.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
-import org.apache.hyracks.algebricks.core.algebra.visitors.IQueryOperatorVisitor;
-
-/**
- * This visitor is to add back variables that are killed in the query plan rooted at an input operator.
- * After visiting, it also provides a variable map for variables that have been
- * mapped in the query plan, e.g., by group-by, assign, and union.
- */
-class EnforceVariablesVisitor implements IQueryOperatorVisitor<ILogicalOperator, Collection<LogicalVariable>> {
-    private final IOptimizationContext context;
-    private final Map<LogicalVariable, LogicalVariable> inputVarToOutputVarMap = new HashMap<>();
-
-    public EnforceVariablesVisitor(IOptimizationContext context) {
-        this.context = context;
-    }
-
-    public Map<LogicalVariable, LogicalVariable> getInputVariableToOutputVariableMap() {
-        return inputVarToOutputVarMap;
-    }
-
-    @Override
-    public ILogicalOperator visitAggregateOperator(AggregateOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return rewriteAggregateOperator(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitRunningAggregateOperator(RunningAggregateOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        return rewriteAggregateOperator(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitGroupByOperator(GroupByOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        Set<LogicalVariable> liveVars = new HashSet<>();
-        VariableUtilities.getLiveVariables(op, liveVars);
-        varsToRecover.removeAll(liveVars);
-
-        // Maps group by key variables if the corresponding expressions are VariableReferenceExpressions.
-        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> keyVarExprRef : op.getGroupByList()) {
-            ILogicalExpression expr = keyVarExprRef.second.getValue();
-            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
-                LogicalVariable sourceVar = varExpr.getVariableReference();
-                updateVarMapping(sourceVar, keyVarExprRef.first);
-                varsToRecover.remove(sourceVar);
-            }
-        }
-
-        for (LogicalVariable varToRecover : varsToRecover) {
-            // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
-            // where the varsToRecover forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
-            op.getDecorList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(null,
-                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varToRecover))));
-        }
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitLimitOperator(LimitOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitInnerJoinOperator(InnerJoinOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitLeftOuterJoinOperator(LeftOuterJoinOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitOrderOperator(OrderOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitAssignOperator(AssignOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        List<Mutable<ILogicalExpression>> assignedExprRefs = op.getExpressions();
-        List<LogicalVariable> assignedVars = op.getVariables();
-
-        // Maps assigning variables if assignment expressions are VariableReferenceExpressions.
-        for (int index = 0; index < assignedVars.size(); ++index) {
-            ILogicalExpression expr = assignedExprRefs.get(index).getValue();
-            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
-                VariableReferenceExpression varExpr = (VariableReferenceExpression) expr;
-                LogicalVariable sourceVar = varExpr.getVariableReference();
-                updateVarMapping(sourceVar, assignedVars.get(index));
-                varsToRecover.remove(sourceVar);
-            }
-        }
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitSelectOperator(SelectOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitExtensionOperator(ExtensionOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitProjectOperator(ProjectOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        varsToRecover.removeAll(op.getVariables());
-        // Adds all missing variables that should propagates up.
-        op.getVariables().addAll(varsToRecover);
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitScriptOperator(ScriptOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        throw new UnsupportedOperationException("Script operators in a subplan are not supported!");
-    }
-
-    @Override
-    public ILogicalOperator visitSubplanOperator(SubplanOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitUnionOperator(UnionAllOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        // Update the variable mappings
-        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varTriples = op.getVariableMappings();
-        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> triple : varTriples) {
-            updateVarMapping(triple.second, triple.first);
-            updateVarMapping(triple.third, triple.first);
-        }
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitUnnestOperator(UnnestOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitOuterUnnestOperator(OuterUnnestOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        Set<LogicalVariable> liveVars = new HashSet<>();
-        VariableUtilities.getLiveVariables(op, liveVars);
-        varsToRecover.remove(liveVars);
-        if (!varsToRecover.isEmpty()) {
-            op.setPropagatesInput(true);
-            return visitsInputs(op, varsToRecover);
-        }
-        return op;
-    }
-
-    @Override
-    public ILogicalOperator visitDataScanOperator(DataSourceScanOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitDistinctOperator(DistinctOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitExchangeOperator(ExchangeOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    @Override
-    public ILogicalOperator visitExternalDataLookupOperator(ExternalDataLookupOperator op,
-            Collection<LogicalVariable> varsToRecover) throws AlgebricksException {
-        Set<LogicalVariable> liveVars = new HashSet<>();
-        VariableUtilities.getLiveVariables(op, liveVars);
-        varsToRecover.retainAll(liveVars);
-        if (!varsToRecover.isEmpty()) {
-            op.setPropagateInput(true);
-            return visitsInputs(op, varsToRecover);
-        }
-        return op;
-    }
-
-    @Override
-    public ILogicalOperator visitTokenizeOperator(TokenizeOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        return visitsInputs(op, varsToRecover);
-    }
-
-    /**
-     * Wraps an AggregateOperator or RunningAggregateOperator with a group-by operator where
-     * the group-by keys are variables in varsToRecover.
-     * Note that the function here prevents this visitor being used to rewrite arbitrary query plans.
-     * Instead, it could only be used for rewriting a nested plan within a subplan operator.
-     *
-     * @param op
-     *            the logical operator for aggregate or running aggregate.
-     * @param varsToRecover
-     *            the set of variables that needs to preserve.
-     * @return the wrapped group-by operator if {@code varsToRecover} is not empty, and {@code op} otherwise.
-     * @throws AlgebricksException
-     */
-    private ILogicalOperator rewriteAggregateOperator(ILogicalOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        Set<LogicalVariable> liveVars = new HashSet<>();
-        VariableUtilities.getLiveVariables(op, liveVars);
-        varsToRecover.removeAll(liveVars);
-
-        GroupByOperator gbyOp = new GroupByOperator();
-        for (LogicalVariable varToRecover : varsToRecover) {
-            // This limits the visitor can only be applied to a nested logical plan inside a Subplan operator,
-            // where the varsToRecover forms a candidate key which can uniquely identify a tuple out of the nested-tuple-source.
-            LogicalVariable newVar = context.newVar();
-            gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(newVar,
-                    new MutableObject<ILogicalExpression>(new VariableReferenceExpression(varToRecover))));
-            updateVarMapping(varToRecover, newVar);
-        }
-
-        NestedTupleSourceOperator nts = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(gbyOp));
-        op.getInputs().clear();
-        op.getInputs().add(new MutableObject<ILogicalOperator>(nts));
-
-        ILogicalOperator inputOp = op.getInputs().get(0).getValue();
-        ILogicalPlan nestedPlan = new ALogicalPlanImpl();
-        nestedPlan.getRoots().add(new MutableObject<ILogicalOperator>(op));
-        gbyOp.getNestedPlans().add(nestedPlan);
-        gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(inputOp));
-
-        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(op, context);
-        return visitsInputs(gbyOp, varsToRecover);
-    }
-
-    private ILogicalOperator visitsInputs(ILogicalOperator op, Collection<LogicalVariable> varsToRecover)
-            throws AlgebricksException {
-        if (op.getInputs().size() == 0 || varsToRecover.isEmpty()) {
-            return op;
-        }
-        Set<LogicalVariable> producedVars = new HashSet<>();
-        VariableUtilities.getProducedVariables(op, producedVars);
-        varsToRecover.removeAll(producedVars);
-        if (!varsToRecover.isEmpty()) {
-            if (op.getInputs().size() == 1) {
-                // Deals with single input operators.
-                ILogicalOperator newOp = op.getInputs().get(0).getValue().accept(this, varsToRecover);
-                op.getInputs().get(0).setValue(newOp);
-            } else {
-                // Deals with multi-input operators.
-                for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
-                    ILogicalOperator child = childRef.getValue();
-                    Set<LogicalVariable> varsToRecoverInChild = new HashSet<>();
-                    VariableUtilities.getProducedVariablesInDescendantsAndSelf(child, varsToRecoverInChild);
-                    // Obtains the variables that this particular child should propagate.
-                    varsToRecoverInChild.retainAll(varsToRecover);
-                    ILogicalOperator newChild = child.accept(this, varsToRecoverInChild);
-                    childRef.setValue(newChild);
-                }
-            }
-        }
-        return op;
-    }
-
-    private void updateVarMapping(LogicalVariable oldVar, LogicalVariable newVar) {
-        if (oldVar.equals(newVar)) {
-            return;
-        }
-        LogicalVariable mappedVar = newVar;
-        if (inputVarToOutputVarMap.containsKey(newVar)) {
-            mappedVar = inputVarToOutputVarMap.get(newVar);
-            inputVarToOutputVarMap.remove(newVar);
-        }
-        inputVarToOutputVarMap.put(oldVar, mappedVar);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
index aa9848c..b86e8e9 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
@@ -22,7 +22,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
@@ -45,14 +44,16 @@ public class IsomorphismUtilities {
             throws AlgebricksException {
         List<Mutable<ILogicalOperator>> inputs1 = op.getInputs();
         List<Mutable<ILogicalOperator>> inputs2 = arg.getInputs();
-        if (inputs1.size() != inputs2.size())
-            return Boolean.FALSE;
+        if (inputs1.size() != inputs2.size()) {
+            return false;
+        }
         for (int i = 0; i < inputs1.size(); i++) {
             ILogicalOperator input1 = inputs1.get(i).getValue();
             ILogicalOperator input2 = inputs2.get(i).getValue();
             boolean isomorphic = isOperatorIsomorphicPlanSegment(input1, input2);
-            if (!isomorphic)
-                return Boolean.FALSE;
+            if (!isomorphic) {
+                return false;
+            }
         }
         return IsomorphismUtilities.isOperatorIsomorphic(op, arg);
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
index f0bcd34..da252d4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalExpressionDeepCopyWithNewVariablesVisitor.java
@@ -52,6 +52,9 @@ public class LogicalExpressionDeepCopyWithNewVariablesVisitor
     }
 
     public ILogicalExpression deepCopy(ILogicalExpression expr) throws AlgebricksException {
+        if (expr == null) {
+            return null;
+        }
         return expr.accept(this, null);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index ec7d7fe..2a92ead 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -348,7 +348,7 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
     @Override
     public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, ILogicalOperator arg)
             throws AlgebricksException {
-        NestedTupleSourceOperator opCopy = new NestedTupleSourceOperator(new MutableObject<ILogicalOperator>(arg));
+        NestedTupleSourceOperator opCopy = new NestedTupleSourceOperator(op.getDataSourceReference());
         deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         return opCopy;
     }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 74e74a6..5cf30c7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -134,7 +134,7 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
     @Override
     public ILogicalOperator visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg)
             throws AlgebricksException {
-        return new NestedTupleSourceOperator(null);
+        return new NestedTupleSourceOperator(op.getDataSourceReference());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index ddcf6c8..4b791c1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -436,7 +436,9 @@ public class SubstituteVariableVisitor
             return;
         }
         IVariableTypeEnvironment env = ctx.getOutputTypeEnvironment(op);
-        env.substituteProducedVariable(arg.first, arg.second);
+        if (env != null) {
+            env.substituteProducedVariable(arg.first, arg.second);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index f80e1cd..e82b4f7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
@@ -167,6 +166,9 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
                     }
                     break;
                 }
+                case RANDOM_PARTITION_EXCHANGE: {
+                    break;
+                }
                 default: {
                     throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");
                 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 70eb544..0352f83 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -28,10 +28,8 @@ import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
 public class VariableUtilities {
@@ -54,6 +52,14 @@ public class VariableUtilities {
         op.accept(visitor, null);
     }
 
+    public static void getSubplanLocalLiveVariables(ILogicalOperator op, Collection<LogicalVariable> liveVariables)
+            throws AlgebricksException {
+        VariableUtilities.getLiveVariables(op, liveVariables);
+        Set<LogicalVariable> locallyProducedVars = new HashSet<>();
+        VariableUtilities.getProducedVariablesInDescendantsAndSelf(op, locallyProducedVars);
+        liveVariables.retainAll(locallyProducedVars);
+    }
+
     public static void getUsedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
             throws AlgebricksException {
         // DFS traversal
@@ -77,6 +83,21 @@ public class VariableUtilities {
         substituteVariables(op, v1, v2, true, ctx);
     }
 
+    public static void substituteVariables(ILogicalOperator op, Map<LogicalVariable, LogicalVariable> varMap,
+            ITypingContext ctx) throws AlgebricksException {
+        for (Map.Entry<LogicalVariable, LogicalVariable> entry : varMap.entrySet()) {
+            VariableUtilities.substituteVariables(op, entry.getKey(), entry.getValue(), ctx);
+        }
+    }
+
+    public static void substituteVariables(ILogicalOperator op,
+            List<Pair<LogicalVariable, LogicalVariable>> oldVarNewVarMapHistory, ITypingContext ctx)
+                    throws AlgebricksException {
+        for (Pair<LogicalVariable, LogicalVariable> entry : oldVarNewVarMapHistory) {
+            VariableUtilities.substituteVariables(op, entry.first, entry.second, ctx);
+        }
+    }
+
     public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,
             LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {
         for (Mutable<ILogicalOperator> childOp : op.getInputs()) {
@@ -100,32 +121,4 @@ public class VariableUtilities {
         return varSet.equals(varArgSet);
     }
 
-    /**
-     * Recursively modifies the query plan to make sure every variable in {@code varsToEnforce}
-     * be part of the output schema of {@code opRef}.
-     *
-     * @param opRef,
-     *            the operator to enforce.
-     * @param varsToEnforce,
-     *            the variables that needs to be live after the operator of {@code opRef}.
-     * @param context,
-     *            the optimization context.
-     * @return a map that maps a variable in {@code varsToEnforce} to yet-another-variable if
-     *         a mapping happens in the query plan under {@code opRef}, e.g., by grouping and assigning.
-     * @throws AlgebricksException
-     */
-    public static Map<LogicalVariable, LogicalVariable> enforceVariablesInDescendantsAndSelf(
-            Mutable<ILogicalOperator> opRef, Collection<LogicalVariable> varsToEnforce, IOptimizationContext context)
-                    throws AlgebricksException {
-        Set<LogicalVariable> copiedVarsToEnforce = new HashSet<>();
-        copiedVarsToEnforce.addAll(varsToEnforce);
-        // Rewrites the query plan
-        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(context);
-        ILogicalOperator result = opRef.getValue().accept(visitor, copiedVarsToEnforce);
-        opRef.setValue(result);
-        // Re-computes the type environment bottom up.
-        OperatorManipulationUtil.computeTypeEnvironmentBottomUp(result, context);
-        return visitor.getInputVariableToOutputVariableMap();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index f5ea5f1..09d2253 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -127,9 +127,8 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
                     public Pair<Boolean, IPartitioningProperty> coordinateRequirements(
                             IPartitioningProperty requirements, IPartitioningProperty firstDeliveredPartitioning,
                             ILogicalOperator op, IOptimizationContext context) throws AlgebricksException {
-                        if (firstDeliveredPartitioning != null
-                                && firstDeliveredPartitioning.getPartitioningType() == requirements
-                                        .getPartitioningType()) {
+                        if (firstDeliveredPartitioning != null && firstDeliveredPartitioning
+                                .getPartitioningType() == requirements.getPartitioningType()) {
                             switch (requirements.getPartitioningType()) {
                                 case UNORDERED_PARTITIONED: {
                                     UnorderedPartitionedProperty upp1 = (UnorderedPartitionedProperty) firstDeliveredPartitioning;
@@ -139,8 +138,8 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
                                     Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
                                     Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
                                     Set<LogicalVariable> keysCurrent = uppreq.getColumnSet();
-                                    List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent)) ? keysRightBranch
-                                            : keysLeftBranch;
+                                    List<LogicalVariable> keysFirst = (keysRightBranch.containsAll(keysCurrent))
+                                            ? keysRightBranch : keysLeftBranch;
                                     List<LogicalVariable> keysSecond = keysFirst == keysRightBranch ? keysLeftBranch
                                             : keysRightBranch;
                                     for (LogicalVariable r : uppreq.getColumnSet()) {
@@ -155,8 +154,8 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
                                             j++;
                                         }
                                         if (!found) {
-                                            throw new IllegalStateException("Did not find a variable equivalent to "
-                                                    + r + " among " + keysFirst);
+                                            throw new IllegalStateException("Did not find a variable equivalent to " + r
+                                                    + " among " + keysFirst);
                                         }
                                         LogicalVariable v2 = keysSecond.get(j);
                                         EquivalenceClass ecFst = eqmap.get(v2);
@@ -167,6 +166,9 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
                                                 break;
                                             }
                                         }
+                                        if (covered.equals(set1)) {
+                                            break;
+                                        }
                                     }
                                     if (!covered.equals(set1)) {
                                         throw new AlgebricksException("Could not modify " + requirements

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
index 66cb6b2..2c93cd4 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -20,12 +20,13 @@ package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
-
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
@@ -54,6 +55,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirement
 import org.apache.hyracks.algebricks.core.algebra.properties.PropertiesUtil;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
 
 public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysicalOperator {
 
@@ -155,7 +157,18 @@ public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysi
                     AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
                     IPhysicalOperator pop2 = op2.getPhysicalOperator();
                     if (pop2 instanceof AbstractPreclusteredGroupByPOperator) {
-                        List<LogicalVariable> sndOrder = ((AbstractPreclusteredGroupByPOperator) pop2).getGbyColumns();
+                        List<LogicalVariable> gbyColumns = ((AbstractPreclusteredGroupByPOperator) pop2)
+                                .getGbyColumns();
+                        List<LogicalVariable> sndOrder = new ArrayList<>();
+                        sndOrder.addAll(gbyColumns);
+                        Set<LogicalVariable> freeVars = new HashSet<>();
+                        try {
+                            OperatorPropertiesUtil.getFreeVariablesInSelfOrDesc(op2, freeVars);
+                        } catch (AlgebricksException e) {
+                            throw new IllegalStateException(e);
+                        }
+                        // Only considers group key variables defined out-side the outer-most group-by operator.
+                        sndOrder.retainAll(freeVars);
                         groupProp.getColumnSet().addAll(sndOrder);
                         groupProp.getPreferredOrderEnforcer().addAll(sndOrder);
                         goon = false;
@@ -210,9 +223,8 @@ public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysi
                     tl.add(((VariableReferenceExpression) decorPair.second.getValue()).getVariableReference());
                     fdList.add(new FunctionalDependency(hd, tl));
                 }
-                if (allOk
-                        && PropertiesUtil.matchLocalProperties(localProps, props,
-                                new HashMap<LogicalVariable, EquivalenceClass>(), fdList)) {
+                if (allOk && PropertiesUtil.matchLocalProperties(localProps, props,
+                        new HashMap<LogicalVariable, EquivalenceClass>(), fdList)) {
                     localProps = props;
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
index c0e20d6..03a8666 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/BroadcastPOperator.java
@@ -18,15 +18,17 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
+import java.util.ArrayList;
+
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
@@ -52,10 +54,9 @@ public class BroadcastPOperator extends AbstractExchangePOperator {
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
         IPartitioningProperty pp = new BroadcastPartitioningProperty(domain);
-        this.deliveredProperties = new StructuralPropertiesVector(pp, op2.getDeliveredPhysicalProperties()
-                .getLocalProperties());
+        // Broadcasts will destroy input local properties.
+        this.deliveredProperties = new StructuralPropertiesVector(pp, new ArrayList<ILocalStructuralProperty>());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 34e3dc6..071ee72 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -18,12 +18,14 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.logging.Logger;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -34,6 +36,8 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBina
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
@@ -106,14 +110,14 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
         int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
-        IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper.variablesToBinaryHashFunctionFactories(
-                keysLeftBranch, env, context);
-        IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(
-                keysLeftBranch, env, context);
+        IBinaryHashFunctionFactory[] hashFunFactories = JobGenHelper
+                .variablesToBinaryHashFunctionFactories(keysLeftBranch, env, context);
+        IBinaryHashFunctionFamily[] hashFunFamilies = JobGenHelper.variablesToBinaryHashFunctionFamilies(keysLeftBranch,
+                env, context);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
         int i = 0;
         IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
@@ -173,9 +177,10 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
                     case INNER: {
                         opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                                 maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
-                                comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysRight, keysLeft), predEvaluatorFactory);
+                                comparatorFactories, recDescriptor,
+                                new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
+                                new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft),
+                                predEvaluatorFactory);
                         break;
                     }
                     case LEFT_OUTER: {
@@ -185,9 +190,10 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
                         }
                         opDesc = new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                                 maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
-                                comparatorFactories, recDescriptor, new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysLeft, keysRight), new JoinMultiComparatorFactory(comparatorFactories,
-                                        keysRight, keysLeft), predEvaluatorFactory, true, nullWriterFactories);
+                                comparatorFactories, recDescriptor,
+                                new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
+                                new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft),
+                                predEvaluatorFactory, true, nullWriterFactories);
                         break;
                     }
                     default: {
@@ -209,7 +215,31 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
     @Override
     protected List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op, IOptimizationContext context)
             throws AlgebricksException {
-        return new LinkedList<ILocalStructuralProperty>();
+        List<ILocalStructuralProperty> deliveredLocalProperties = new ArrayList<ILocalStructuralProperty>();
+        // Inner join can kick off the "role reversal" optimization, which can kill data properties for the probe side.
+        if (kind == JoinKind.LEFT_OUTER) {
+            AbstractLogicalOperator probeOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+            IPhysicalPropertiesVector probeSideProperties = probeOp.getPhysicalOperator().getDeliveredProperties();
+            List<ILocalStructuralProperty> probeSideLocalProperties = probeSideProperties.getLocalProperties();
+            if (probeSideLocalProperties != null) {
+                // The local grouping property in the probe side will be maintained
+                // and the local ordering property in the probe side will be turned into a local grouping property
+                // if the grouping variables (or sort columns) in the local property contain all the join key variables
+                // for the left branch:
+                // 1. in case spilling is not kicked off, the ordering property is maintained and hence local grouping property is maintained.
+                // 2. if spilling is kicked off, the grouping property is still maintained though the ordering property is destroyed.
+                for (ILocalStructuralProperty property : probeSideLocalProperties) {
+                    Set<LogicalVariable> groupingVars = new ListSet<LogicalVariable>();
+                    Set<LogicalVariable> leftBranchVars = new ListSet<LogicalVariable>();
+                    property.getVariables(groupingVars);
+                    leftBranchVars.addAll(getKeysLeftBranch());
+                    if (groupingVars.containsAll(leftBranchVars)) {
+                        deliveredLocalProperties.add(new LocalGroupingProperty(groupingVars));
+                    }
+                }
+            }
+        }
+        return deliveredLocalProperties;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
index a923944..7ff15d7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/NLJoinPOperator.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -105,8 +104,11 @@ public class NLJoinPOperator extends AbstractJoinPOperator {
             pp = IPartitioningProperty.UNPARTITIONED;
         }
 
-        List<ILocalStructuralProperty> localProps = new LinkedList<ILocalStructuralProperty>();
-        this.deliveredProperties = new StructuralPropertiesVector(pp, localProps);
+        // Nested loop join maintains the local structure property for the probe side.
+        AbstractLogicalOperator probeOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        IPhysicalPropertiesVector probeSideProperties = probeOp.getPhysicalOperator().getDeliveredProperties();
+        List<ILocalStructuralProperty> probeSideLocalProperties = probeSideProperties.getLocalProperties();
+        this.deliveredProperties = new StructuralPropertiesVector(pp, probeSideLocalProperties);
     }
 
     @Override
@@ -125,7 +127,7 @@ public class NLJoinPOperator extends AbstractJoinPOperator {
     @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         AbstractBinaryJoinOperator join = (AbstractBinaryJoinOperator) op;
         RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
                 propagatedSchema, context);
@@ -221,8 +223,8 @@ public class NLJoinPOperator extends AbstractJoinPOperator {
             } catch (AlgebricksException ae) {
                 throw new HyracksDataException(ae);
             }
-            boolean result = binaryBooleanInspector
-                    .getBooleanValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+            boolean result = binaryBooleanInspector.getBooleanValue(p.getByteArray(), p.getStartOffset(),
+                    p.getLength());
             if (result)
                 return 0;
             else

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
index 42e6bcf..af0087d 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RandomPartitionPOperator.java
@@ -47,9 +47,10 @@ public class RandomPartitionPOperator extends AbstractExchangePOperator {
         this.domain = domain;
     }
 
+    @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
-            throws AlgebricksException {
+                    throws AlgebricksException {
         Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
                 opSchema, context);
         builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
@@ -62,9 +63,10 @@ public class RandomPartitionPOperator extends AbstractExchangePOperator {
         return false;
     }
 
+    @Override
     public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
-        ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory(domain.cardinality());
+        ITuplePartitionComputerFactory tpcf = new RandomPartitionComputerFactory();
         MToNPartitioningConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
         return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
     }
@@ -77,8 +79,8 @@ public class RandomPartitionPOperator extends AbstractExchangePOperator {
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain), op2
-                .getDeliveredPhysicalProperties().getLocalProperties());
+        this.deliveredProperties = new StructuralPropertiesVector(new RandomPartitioningProperty(domain),
+                op2.getDeliveredPhysicalProperties().getLocalProperties());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index b4569e4..b5099b1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.algebricks.core.algebra.util;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -239,7 +240,35 @@ public class OperatorManipulationUtil {
         for (Mutable<ILogicalOperator> children : op.getInputs()) {
             computeTypeEnvironmentBottomUp(children.getValue(), context);
         }
+        AbstractLogicalOperator abstractOp = (AbstractLogicalOperator) op;
+        if (abstractOp.hasNestedPlans()) {
+            for (ILogicalPlan p : ((AbstractOperatorWithNestedPlans) op).getNestedPlans()) {
+                for (Mutable<ILogicalOperator> rootRef : p.getRoots()) {
+                    computeTypeEnvironmentBottomUp(rootRef.getValue(), context);
+                }
+            }
+        }
         context.computeAndSetTypeEnvironmentForOperator(op);
     }
 
+    /***
+     * Is the operator <code>>op</code> an ancestor of any operators with tags in the set <code>tags</code>?
+     *
+     * @param op
+     * @param tags
+     * @return True if yes; false other wise.
+     */
+    public static boolean ancestorOfOperators(ILogicalOperator op, Set<LogicalOperatorTag> tags) {
+        LogicalOperatorTag opTag = op.getOperatorTag();
+        if (tags.contains(opTag)) {
+            return true;
+        }
+        for (Mutable<ILogicalOperator> children : op.getInputs()) {
+            if (ancestorOfOperators(children.getValue(), tags)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
index 8e254f0..d130d4c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -98,7 +98,8 @@ public class JobBuilder implements IHyracksJobBuilder {
     }
 
     @Override
-    public void contributeGraphEdge(ILogicalOperator src, int srcOutputIndex, ILogicalOperator dest, int destInputIndex) {
+    public void contributeGraphEdge(ILogicalOperator src, int srcOutputIndex, ILogicalOperator dest,
+            int destInputIndex) {
         ArrayList<ILogicalOperator> outputs = outEdges.get(src);
         if (outputs == null) {
             outputs = new ArrayList<ILogicalOperator>();
@@ -144,7 +145,13 @@ public class JobBuilder implements IHyracksJobBuilder {
         List<OperatorDescriptorId> roots = jobSpec.getRoots();
         setSpecifiedPartitionConstraints();
         for (OperatorDescriptorId rootId : roots) {
-            setPartitionConstraintsDFS(rootId, tgtConstraints, null);
+            setPartitionConstraintsBottomup(rootId, tgtConstraints, null, false);
+        }
+        for (OperatorDescriptorId rootId : roots) {
+            setPartitionConstraintsTopdown(rootId, tgtConstraints, null);
+        }
+        for (OperatorDescriptorId rootId : roots) {
+            setPartitionConstraintsBottomup(rootId, tgtConstraints, null, true);
         }
     }
 
@@ -161,7 +168,7 @@ public class JobBuilder implements IHyracksJobBuilder {
         }
     }
 
-    private void setPartitionConstraintsDFS(OperatorDescriptorId opId,
+    private void setPartitionConstraintsTopdown(OperatorDescriptorId opId,
             Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp) {
         List<IConnectorDescriptor> opInputs = jobSpec.getOperatorInputMap().get(opId);
         AlgebricksPartitionConstraint opConstraint = null;
@@ -172,9 +179,39 @@ public class JobBuilder implements IHyracksJobBuilder {
                 org.apache.commons.lang3.tuple.Pair<org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>, org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>> p = jobSpec
                         .getConnectorOperatorMap().get(cid);
                 IOperatorDescriptor src = p.getLeft().getLeft();
-                // DFS
-                setPartitionConstraintsDFS(src.getOperatorId(), tgtConstraints, opDesc);
+                TargetConstraint constraint = tgtConstraints.get(conn);
+                if (constraint != null) {
+                    if (constraint == TargetConstraint.SAME_COUNT) {
+                        opConstraint = partitionConstraintMap.get(opDesc);
+                        if (partitionConstraintMap.get(src) == null) {
+                            if (opConstraint != null) {
+                                partitionConstraintMap.put(src, opConstraint);
+                                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, src,
+                                        opConstraint);
+                            }
+                        }
+                    }
+                }
+                // Post Order DFS
+                setPartitionConstraintsTopdown(src.getOperatorId(), tgtConstraints, opDesc);
+            }
+        }
+    }
 
+    private void setPartitionConstraintsBottomup(OperatorDescriptorId opId,
+            Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp,
+            boolean finalPass) {
+        List<IConnectorDescriptor> opInputs = jobSpec.getOperatorInputMap().get(opId);
+        AlgebricksPartitionConstraint opConstraint = null;
+        IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(opId);
+        if (opInputs != null) {
+            for (IConnectorDescriptor conn : opInputs) {
+                ConnectorDescriptorId cid = conn.getConnectorId();
+                org.apache.commons.lang3.tuple.Pair<org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>, org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>> p = jobSpec
+                        .getConnectorOperatorMap().get(cid);
+                IOperatorDescriptor src = p.getLeft().getLeft();
+                // Pre-order DFS
+                setPartitionConstraintsBottomup(src.getOperatorId(), tgtConstraints, opDesc, finalPass);
                 TargetConstraint constraint = tgtConstraints.get(conn);
                 if (constraint != null) {
                     switch (constraint) {
@@ -200,12 +237,14 @@ public class JobBuilder implements IHyracksJobBuilder {
                         opConstraint = new AlgebricksCountPartitionConstraint(1);
                     }
                 }
-                if (opConstraint == null) {
+                if (opConstraint == null && finalPass) {
                     opConstraint = clusterLocations;
                 }
             }
-            partitionConstraintMap.put(opDesc, opConstraint);
-            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint);
+            if (opConstraint != null) {
+                partitionConstraintMap.put(opDesc, opConstraint);
+                AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 16c4855..286d5c7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -67,7 +67,6 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
     private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<IAlgebraicRewriteRule, HashSet<ILogicalOperator>>();
     private Map<LogicalVariable, FunctionalDependency> recordToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>();
 
-    @SuppressWarnings("unchecked")
     private IMetadataProvider metadataProvider;
     private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<LogicalVariable>();
 
@@ -90,7 +89,8 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
     public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
             IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
             IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
-            PhysicalOptimizationConfig physicalOptimizationConfig, LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
+            PhysicalOptimizationConfig physicalOptimizationConfig,
+            LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
         this.varCounter = varCounter;
         this.expressionEvalSizeComputer = expressionEvalSizeComputer;
         this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
@@ -100,29 +100,35 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
         this.prettyPrintVisitor = prettyPrintVisitor;
     }
 
+    @Override
     public int getVarCounter() {
         return varCounter;
     }
 
+    @Override
     public void setVarCounter(int varCounter) {
         this.varCounter = varCounter;
     }
 
+    @Override
     public LogicalVariable newVar() {
         varCounter++;
         LogicalVariable var = new LogicalVariable(varCounter);
         return var;
     }
 
+    @Override
     @SuppressWarnings("unchecked")
     public IMetadataProvider getMetadataProvider() {
         return metadataProvider;
     }
 
+    @Override
     public void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider) {
         this.metadataProvider = metadataProvider;
     }
 
+    @Override
     public boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
         HashSet<ILogicalOperator> operators = dontApply.get(rule);
         if (operators == null) {
@@ -132,6 +138,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
         }
     }
 
+    @Override
     public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
         HashSet<ILogicalOperator> operators = dontApply.get(rule);
         if (operators == null) {
@@ -164,26 +171,30 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
             }
         }
     }
-    
+
     @Override
     public void removeFromAlreadyCompared(ILogicalOperator op1) {
         alreadyCompared.remove(op1);
     }
 
+    @Override
     public void addNotToBeInlinedVar(LogicalVariable var) {
         notToBeInlinedVars.add(var);
     }
 
+    @Override
     public boolean shouldNotBeInlined(LogicalVariable var) {
         return notToBeInlinedVars.contains(var);
     }
 
+    @Override
     public void addPrimaryKey(FunctionalDependency pk) {
         assert (pk.getTail().size() == 1);
         LogicalVariable recordVar = pk.getTail().get(0);
         recordToPrimaryKey.put(recordVar, pk);
     }
 
+    @Override
     public List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar) {
         FunctionalDependency fd = recordToPrimaryKey.get(recordVar);
         if (fd == null) {
@@ -213,6 +224,12 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
     }
 
     @Override
+    public void clearAllFDAndEquivalenceClasses() {
+        eqClassGlobalMap.clear();
+        fdGlobalMap.clear();
+    }
+
+    @Override
     public ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op) {
         return logicalProps.get(op);
     }
@@ -232,10 +249,12 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
         return varEvalSizeEnv;
     }
 
+    @Override
     public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory() {
         return mergeAggregationExpressionFactory;
     }
 
+    @Override
     public PhysicalOptimizationConfig getPhysicalOptimizationConfig() {
         return physicalOptimizationConfig;
     }
@@ -295,7 +314,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
             me.setValue(new FunctionalDependency(hd, tl));
         }
     }
-    
+
     @Override
     public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor() {
         return prettyPrintVisitor;

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/0b642674/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java b/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java
deleted file mode 100644
index d0676cf..0000000
--- a/algebricks/algebricks-core/src/test/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/EnforceVariablesVisitorTest.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.commons.lang3.mutable.MutableObject;
-import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
-import org.junit.Test;
-
-import junit.framework.Assert;
-
-public class EnforceVariablesVisitorTest {
-
-    /**
-     * Tests the processing of project operator in RecoverVariablesVisitor.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testProject() throws Exception {
-        // Constructs the input operator.
-        LogicalVariable var = new LogicalVariable(1);
-        List<LogicalVariable> inputVarList = new ArrayList<>();
-        inputVarList.add(var);
-        ProjectOperator projectOp = new ProjectOperator(inputVarList);
-
-        // Constructs the visitor.
-        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
-        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
-        // Calls the visitor.
-        LogicalVariable varToEnforce = new LogicalVariable(2);
-        ProjectOperator op = (ProjectOperator) projectOp.accept(visitor,
-                Arrays.asList(new LogicalVariable[] { varToEnforce }));
-
-        // Checks the result.
-        List<LogicalVariable> expectedVars = Arrays.asList(new LogicalVariable[] { var, varToEnforce });
-        Assert.assertEquals(expectedVars, op.getVariables());
-        Assert.assertTrue(visitor.getInputVariableToOutputVariableMap().isEmpty());
-    }
-
-    /**
-     * Tests the processing of group-by operator in RecoverVariablesVisitor.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testGroupby() throws Exception {
-        // Constructs the group-by operator.
-        LogicalVariable keyVar = new LogicalVariable(2);
-        LogicalVariable keyExprVar = new LogicalVariable(1);
-        GroupByOperator gbyOp = new GroupByOperator();
-        gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar,
-                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar))));
-
-        // Constructs the visitor.
-        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
-        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
-        // Calls the visitor.
-        LogicalVariable varToEnforce = new LogicalVariable(3);
-        Set<LogicalVariable> varsToEnforce = new HashSet<>();
-        varsToEnforce.add(keyExprVar);
-        varsToEnforce.add(varToEnforce);
-        GroupByOperator op = (GroupByOperator) gbyOp.accept(visitor, varsToEnforce);
-
-        // Checks the result.
-        Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
-        expectedVarMap.put(keyExprVar, keyVar);
-        Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
-        VariableReferenceExpression decorVarExpr = (VariableReferenceExpression) op.getDecorList().get(0).second
-                .getValue();
-        Assert.assertEquals(decorVarExpr.getVariableReference(), varToEnforce);
-    }
-
-    /**
-     * Tests the processing of aggregate operator in RecoverVariablesVisitor.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testAggregate() throws Exception {
-        // Constructs the group-by operator.
-        List<LogicalVariable> aggVars = new ArrayList<>();
-        List<Mutable<ILogicalExpression>> aggExprRefs = new ArrayList<>();
-        AggregateOperator aggOp = new AggregateOperator(aggVars, aggExprRefs);
-
-        // Constructs the visitor.
-        LogicalVariable var = new LogicalVariable(3);
-        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
-        when(mockedContext.newVar()).thenReturn(var);
-        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
-        // Calls the visitor.
-        LogicalVariable varToEnforce = new LogicalVariable(2);
-        Set<LogicalVariable> varsToEnforce = new HashSet<>();
-        varsToEnforce.add(varToEnforce);
-        GroupByOperator op = (GroupByOperator) aggOp.accept(visitor, varsToEnforce);
-
-        // Checks the result.
-        Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
-        expectedVarMap.put(varToEnforce, var);
-        Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
-        VariableReferenceExpression keyExpr = (VariableReferenceExpression) op.getGroupByList().get(0).second
-                .getValue();
-        Assert.assertEquals(keyExpr.getVariableReference(), varToEnforce);
-        LogicalVariable expectedGbyVar = op.getGroupByList().get(0).first;
-        Assert.assertEquals(expectedGbyVar, var);
-    }
-
-    /**
-     * Tests the processing of two serial group-by operators in RecoverVariablesVisitor.
-     *
-     * @throws Exception
-     */
-    @Test
-    public void testTwoGroupbys() throws Exception {
-        // Constructs the group-by operators.
-        LogicalVariable keyVar = new LogicalVariable(1);
-        LogicalVariable keyExprVar = new LogicalVariable(2);
-        GroupByOperator gbyOp = new GroupByOperator();
-        gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar,
-                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar))));
-        LogicalVariable keyVar2 = new LogicalVariable(2);
-        LogicalVariable keyExprVar2 = new LogicalVariable(3);
-        GroupByOperator gbyOp2 = new GroupByOperator();
-        gbyOp.getGroupByList().add(new Pair<LogicalVariable, Mutable<ILogicalExpression>>(keyVar2,
-                new MutableObject<ILogicalExpression>(new VariableReferenceExpression(keyExprVar2))));
-        gbyOp.getInputs().add(new MutableObject<ILogicalOperator>(gbyOp2));
-
-        // Constructs the visitor.
-        IOptimizationContext mockedContext = mock(IOptimizationContext.class);
-        EnforceVariablesVisitor visitor = new EnforceVariablesVisitor(mockedContext);
-
-        // Calls the visitor.
-        LogicalVariable varToEnforce = new LogicalVariable(4);
-        Set<LogicalVariable> varsToEnforce = new HashSet<>();
-        varsToEnforce.add(keyExprVar2);
-        varsToEnforce.add(varToEnforce);
-        GroupByOperator op = (GroupByOperator) gbyOp.accept(visitor, varsToEnforce);
-
-        // Checks the result.
-        Map<LogicalVariable, LogicalVariable> expectedVarMap = new HashMap<>();
-        expectedVarMap.put(keyExprVar2, keyVar);
-        Assert.assertEquals(expectedVarMap, visitor.getInputVariableToOutputVariableMap());
-        VariableReferenceExpression decorVarExpr = (VariableReferenceExpression) op.getDecorList().get(0).second
-                .getValue();
-        Assert.assertEquals(decorVarExpr.getVariableReference(), varToEnforce);
-        GroupByOperator op2 = (GroupByOperator) op.getInputs().get(0).getValue();
-        VariableReferenceExpression decorVarExpr2 = (VariableReferenceExpression) op2.getDecorList().get(0).second
-                .getValue();
-        Assert.assertEquals(decorVarExpr2.getVariableReference(), varToEnforce);
-    }
-
-}