You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:41:22 UTC

[09/51] [partial] incubator-asterixdb-hyracks git commit: Change folder structure for Java repackage

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
new file mode 100644
index 0000000..2f080fb
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/CopyLimitDownRule.java
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StreamLimitPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class CopyLimitDownRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.LIMIT) {
+            return false;
+        }
+        LimitOperator limitOp = (LimitOperator) op;
+        if (!limitOp.isTopmostLimitOp()) {
+            return false;
+        }
+
+        List<LogicalVariable> limitUsedVars = new ArrayList<>();
+        VariableUtilities.getUsedVariables(limitOp, limitUsedVars);
+
+        Mutable<ILogicalOperator> safeOpRef = null;
+        Mutable<ILogicalOperator> candidateOpRef = limitOp.getInputs().get(0);
+
+        List<LogicalVariable> candidateProducedVars = new ArrayList<>();
+        while (true) {
+            candidateProducedVars.clear();
+            ILogicalOperator candidateOp = candidateOpRef.getValue();
+            LogicalOperatorTag candidateOpTag = candidateOp.getOperatorTag();
+            if (candidateOp.getInputs().size() > 1 || !candidateOp.isMap()
+                    || candidateOpTag == LogicalOperatorTag.SELECT || candidateOpTag == LogicalOperatorTag.LIMIT
+                    || !OperatorPropertiesUtil.disjoint(limitUsedVars, candidateProducedVars)) {
+                break;
+            }
+
+            safeOpRef = candidateOpRef;
+            candidateOpRef = safeOpRef.getValue().getInputs().get(0);
+        }
+
+        if (safeOpRef != null) {
+            ILogicalOperator safeOp = safeOpRef.getValue();
+            Mutable<ILogicalOperator> unsafeOpRef = safeOp.getInputs().get(0);
+            ILogicalOperator unsafeOp = unsafeOpRef.getValue();
+            LimitOperator limitCloneOp = null;
+            if (limitOp.getOffset().getValue() == null) {
+                limitCloneOp = new LimitOperator(limitOp.getMaxObjects().getValue(), false);
+            } else {
+                IFunctionInfo finfoAdd = context.getMetadataProvider().lookupFunction(
+                        AlgebricksBuiltinFunctions.NUMERIC_ADD);
+                List<Mutable<ILogicalExpression>> addArgs = new ArrayList<>();
+                addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getMaxObjects().getValue().cloneExpression()));
+                addArgs.add(new MutableObject<ILogicalExpression>(limitOp.getOffset().getValue().cloneExpression()));
+                ScalarFunctionCallExpression maxPlusOffset = new ScalarFunctionCallExpression(finfoAdd, addArgs);
+                limitCloneOp = new LimitOperator(maxPlusOffset, false);
+            }
+            limitCloneOp.setPhysicalOperator(new StreamLimitPOperator());
+            limitCloneOp.getInputs().add(new MutableObject<ILogicalOperator>(unsafeOp));
+            limitCloneOp.setExecutionMode(unsafeOp.getExecutionMode());
+            limitCloneOp.recomputeSchema();
+            unsafeOpRef.setValue(limitCloneOp);
+            context.computeAndSetTypeEnvironmentForOperator(limitCloneOp);
+            context.addToDontApplySet(this, limitOp);
+        }
+
+        return safeOpRef != null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
new file mode 100644
index 0000000..e93fdd1
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateGroupByEmptyKeyRule.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule lift out the aggregate operator out from a group-by operator
+ * if the gby operator groups-by on empty key, e.g., the group-by variables are empty.
+ * 
+ * @author yingyib
+ */
+public class EliminateGroupByEmptyKeyRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.GROUP) {
+            return false;
+        }
+        GroupByOperator groupOp = (GroupByOperator) op;
+        List<LogicalVariable> groupVars = groupOp.getGbyVarList();
+        if (groupVars.size() > 0) {
+            return false;
+        }
+        List<ILogicalPlan> nestedPlans = groupOp.getNestedPlans();
+        if (nestedPlans.size() > 1) {
+            return false;
+        }
+        ILogicalPlan nestedPlan = nestedPlans.get(0);
+        if (nestedPlan.getRoots().size() > 1) {
+            return false;
+        }
+        Mutable<ILogicalOperator> topOpRef = nestedPlan.getRoots().get(0);
+        ILogicalOperator topOp = nestedPlan.getRoots().get(0).getValue();
+        Mutable<ILogicalOperator> nestedTupleSourceRef = getNestedTupleSourceReference(topOpRef);
+        /**
+         * connect nested top op into the plan
+         */
+        opRef.setValue(topOp);
+        /**
+         * connect child op into the plan
+         */
+        nestedTupleSourceRef.setValue(groupOp.getInputs().get(0).getValue());
+        return true;
+    }
+
+    private Mutable<ILogicalOperator> getNestedTupleSourceReference(Mutable<ILogicalOperator> nestedTopOperatorRef) {
+        Mutable<ILogicalOperator> currentOpRef = nestedTopOperatorRef;
+        while (currentOpRef.getValue().getInputs() != null && currentOpRef.getValue().getInputs().size() > 0) {
+            currentOpRef = currentOpRef.getValue().getInputs().get(0);
+        }
+        return currentOpRef;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java
new file mode 100644
index 0000000..8a381f7
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanRule.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.LinkedList;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+public class EliminateSubplanRule implements IAlgebraicRewriteRule {
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    /**
+     * Eliminate Subplan above ETS
+     * and Subplan that has only ops. with one input and no free vars. (could we
+     * modify it to consider free vars which are sources of Unnest or Assign, if
+     * there are no aggregates?)
+     */
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+            return false;
+        }
+        SubplanOperator subplan = (SubplanOperator) op;
+
+        Mutable<ILogicalOperator> outerRef = subplan.getInputs().get(0);
+        AbstractLogicalOperator outerRefOp = (AbstractLogicalOperator) outerRef.getValue();
+        if (outerRefOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
+            elimSubplanOverEts(opRef, context);
+            return true;
+        }
+        if (subplan.getNestedPlans().size() == 1 && subplan.getNestedPlans().get(0).getRoots().size() == 1
+                && !OperatorPropertiesUtil.hasFreeVariables(subplan)) {
+            if (elimOneSubplanWithNoFreeVars(opRef)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private boolean elimOneSubplanWithNoFreeVars(Mutable<ILogicalOperator> opRef) {
+        SubplanOperator subplan = (SubplanOperator) opRef.getValue();
+        AbstractLogicalOperator rootOp = (AbstractLogicalOperator) subplan.getNestedPlans().get(0).getRoots().get(0)
+                .getValue();
+        if (rootOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE
+                || rootOp.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+            opRef.setValue(subplan.getInputs().get(0).getValue());
+            return true;
+        } else {
+            AbstractLogicalOperator botOp = rootOp;
+            if (botOp.getInputs().size() != 1) {
+                return false;
+            }
+            do {
+                Mutable<ILogicalOperator> botRef = botOp.getInputs().get(0);
+                botOp = (AbstractLogicalOperator) botRef.getValue();
+                if (botOp.getOperatorTag() == LogicalOperatorTag.EMPTYTUPLESOURCE) {
+                    botRef.setValue(subplan.getInputs().get(0).getValue());
+                    opRef.setValue(rootOp);
+                    return true;
+                }
+            } while (botOp.getInputs().size() == 1);
+            return false;
+        }
+    }
+
+    private void elimSubplanOverEts(Mutable<ILogicalOperator> opRef, IOptimizationContext ctx)
+            throws AlgebricksException {
+        SubplanOperator subplan = (SubplanOperator) opRef.getValue();
+        for (ILogicalPlan p : subplan.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                OperatorManipulationUtil.ntsToEts(r, ctx);
+            }
+        }
+        LinkedList<Mutable<ILogicalOperator>> allRoots = subplan.allRootsInReverseOrder();
+        if (allRoots.size() == 1) {
+            opRef.setValue(allRoots.get(0).getValue());
+        } else {
+            ILogicalOperator topOp = null;
+            for (Mutable<ILogicalOperator> r : allRoots) {
+                if (topOp == null) {
+                    topOp = r.getValue();
+                } else {
+                    LeftOuterJoinOperator j = new LeftOuterJoinOperator(new MutableObject<ILogicalExpression>(
+                            ConstantExpression.TRUE));
+                    j.getInputs().add(new MutableObject<ILogicalOperator>(topOp));
+                    j.getInputs().add(r);
+                    ctx.setOutputTypeEnvironment(j, j.computeOutputTypeEnvironment(ctx));
+                    topOp = j;
+                }
+            }
+            opRef.setValue(topOp);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java
new file mode 100644
index 0000000..1397956
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EliminateSubplanWithInputCardinalityOneRule.java
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * This rule eliminates a subplan with the following pattern:
+ * -- SUBPLAN
+ * -- OP (where OP produces exactly one tuple)
+ * The live variables at OP will not be used after SUBPLAN.
+ * Note: This rule must be applied after
+ * the RemoveRedundantVariablesRule (to avoid the lineage analysis of variable cardinality).
+ * 
+ * @author yingyib
+ */
+public class EliminateSubplanWithInputCardinalityOneRule implements IAlgebraicRewriteRule {
+    /** The pointer to the topmost operator */
+    private Mutable<ILogicalOperator> rootRef;
+    /** Whether the rule has even been invoked */
+    private boolean invoked = false;
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        if (!invoked) {
+            rootRef = opRef;
+            invoked = true;
+        }
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getInputs().size() <= 0) {
+            return false;
+        }
+        boolean changed = false;
+        for (Mutable<ILogicalOperator> subplanRef : op.getInputs()) {
+            AbstractLogicalOperator op1 = (AbstractLogicalOperator) subplanRef.getValue();
+            if (op1.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+                continue;
+            }
+
+            SubplanOperator subplan = (SubplanOperator) op1;
+            Set<LogicalVariable> usedVarsUp = new ListSet<LogicalVariable>();
+            OperatorPropertiesUtil.getFreeVariablesInPath(rootRef.getValue(), subplan, usedVarsUp);
+            // TODO(buyingyi): figure out the rewriting for subplan operators with multiple subplans.
+            if (subplan.getNestedPlans().size() != 1) {
+                continue;
+            }
+
+            ILogicalOperator subplanInputOperator = subplan.getInputs().get(0).getValue();
+            Set<LogicalVariable> subplanInputVars = new ListSet<LogicalVariable>();
+            VariableUtilities.getLiveVariables(subplanInputOperator, subplanInputVars);
+            int subplanInputVarSize = subplanInputVars.size();
+            subplanInputVars.removeAll(usedVarsUp);
+            // Makes sure the free variables are only used in the subplan.
+            if (subplanInputVars.size() < subplanInputVarSize) {
+                continue;
+            }
+            Set<LogicalVariable> freeVars = new ListSet<LogicalVariable>();
+            OperatorPropertiesUtil.getFreeVariablesInSubplans(subplan, freeVars);
+            boolean cardinalityOne = isCardinalityOne(subplan.getInputs().get(0), freeVars);
+            if (cardinalityOne) {
+                /** If the cardinality of freeVars in the subplan is one, the subplan can be removed. */
+                ILogicalPlan plan = subplan.getNestedPlans().get(0);
+
+                List<Mutable<ILogicalOperator>> rootRefs = plan.getRoots();
+                // TODO(buyingyi): investigate the case of multi-root plans.
+                if (rootRefs.size() != 1) {
+                    continue;
+                }
+                Set<Mutable<ILogicalOperator>> ntsSet = new ListSet<Mutable<ILogicalOperator>>();
+                findNts(rootRefs.get(0), ntsSet);
+
+                /** Replaces nts with the input operator of the subplan. */
+                for (Mutable<ILogicalOperator> nts : ntsSet) {
+                    nts.setValue(subplanInputOperator);
+                }
+                subplanRef.setValue(rootRefs.get(0).getValue());
+                changed = true;
+            } else {
+                continue;
+            }
+        }
+        return changed;
+    }
+
+    /**
+     * Whether the cardinality of the input free variables are one.
+     * 
+     * @param opRef
+     *            the operator to be checked (including its input operators)
+     * @param freeVars
+     *            variables to be checked for produced operators
+     * @return true if every input variable has cardinality one; false otherwise.
+     * @throws AlgebricksException
+     */
+    private boolean isCardinalityOne(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> freeVars)
+            throws AlgebricksException {
+        Set<LogicalVariable> varsWithCardinalityOne = new ListSet<LogicalVariable>();
+        Set<LogicalVariable> varsLiveAtUnnestAndJoin = new ListSet<LogicalVariable>();
+        isCardinalityOne(opRef, freeVars, varsWithCardinalityOne, varsLiveAtUnnestAndJoin);
+        varsWithCardinalityOne.removeAll(varsLiveAtUnnestAndJoin);
+        return varsWithCardinalityOne.equals(freeVars);
+    }
+
+    /**
+     * Recursively adding variables which has cardinality one and in int the input free variable set.
+     * 
+     * @param opRef
+     *            , the current operator reference.
+     * @param freeVars
+     *            , a set of variables.
+     * @param varsWithCardinalityOne
+     *            , variables in the free variable set with cardinality one at the time they are created.
+     * @param varsLiveAtUnnestAndJoin
+     *            , live variables at Unnest and Join. The cardinalities of those variables can become more than one
+     *            even if their cardinalities were one at the time those variables were created.
+     * @throws AlgebricksException
+     */
+    private void isCardinalityOne(Mutable<ILogicalOperator> opRef, Set<LogicalVariable> freeVars,
+            Set<LogicalVariable> varsWithCardinalityOne, Set<LogicalVariable> varsLiveAtUnnestAndJoin)
+            throws AlgebricksException {
+        AbstractLogicalOperator operator = (AbstractLogicalOperator) opRef.getValue();
+        List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getProducedVariables(operator, producedVars);
+        if (operator.getOperatorTag() == LogicalOperatorTag.UNNEST
+                || operator.getOperatorTag() == LogicalOperatorTag.INNERJOIN
+                || operator.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
+            VariableUtilities.getLiveVariables(operator, varsLiveAtUnnestAndJoin);
+        }
+        if (operator.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+            for (LogicalVariable producedVar : producedVars) {
+                if (freeVars.contains(producedVar)) {
+                    varsWithCardinalityOne.add(producedVar);
+                }
+            }
+        }
+        if (varsWithCardinalityOne.size() == freeVars.size()) {
+            return;
+        }
+        for (Mutable<ILogicalOperator> childRef : operator.getInputs()) {
+            isCardinalityOne(childRef, freeVars, varsWithCardinalityOne, varsLiveAtUnnestAndJoin);
+        }
+    }
+
+    /**
+     * Find the NestedTupleSource operator in the direct/undirect input operators of opRef.
+     * 
+     * @param opRef
+     *            , the current operator reference.
+     * @param ntsSet
+     *            , the set NestedTupleSource operator references.
+     */
+    private void findNts(Mutable<ILogicalOperator> opRef, Set<Mutable<ILogicalOperator>> ntsSet) {
+        int childSize = opRef.getValue().getInputs().size();
+        if (childSize == 0) {
+            AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+            if (op.getOperatorTag() == LogicalOperatorTag.NESTEDTUPLESOURCE) {
+                ntsSet.add(opRef);
+            }
+            return;
+        }
+        for (Mutable<ILogicalOperator> childRef : opRef.getValue().getInputs()) {
+            findNts(childRef, ntsSet);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
new file mode 100644
index 0000000..d4834a3
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceOrderByAfterSubplan.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractLogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+/**
+ * If there is any ordering property before the subplan operator, the ordering should
+ * be kept after the subplan.
+ * This rule adds a redundant order operator after those cases, to guarantee the correctness.
+ * 
+ * @author yingyib
+ */
+public class EnforceOrderByAfterSubplan implements IAlgebraicRewriteRule {
+    /** a set of order-breaking operators */
+    private final Set<LogicalOperatorTag> orderBreakingOps = new HashSet<LogicalOperatorTag>();
+    /** a set of order-sensitive operators */
+    private final Set<LogicalOperatorTag> orderSensitiveOps = new HashSet<LogicalOperatorTag>();
+
+    public EnforceOrderByAfterSubplan() {
+        /** add operators that break the ordering */
+        orderBreakingOps.add(LogicalOperatorTag.INNERJOIN);
+        orderBreakingOps.add(LogicalOperatorTag.LEFTOUTERJOIN);
+        orderBreakingOps.add(LogicalOperatorTag.UNIONALL);
+        orderBreakingOps.add(LogicalOperatorTag.AGGREGATE);
+
+        /** add operators that are sensitive to the ordering */
+        orderSensitiveOps.add(LogicalOperatorTag.LIMIT);
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) opRef.getValue();
+        if (context.checkIfInDontApplySet(this, op1)) {
+            return false;
+        }
+        List<Mutable<ILogicalOperator>> inputs = op1.getInputs();
+        context.addToDontApplySet(this, op1);
+        if (op1.getOperatorTag() == LogicalOperatorTag.ORDER || inputs == null) {
+            /**
+             * does not apply if
+             * 1. there is yet-another order operator on-top-of the subplan, because the downstream order operator's ordering will be broken anyway
+             * 2. the input operator(s) is null
+             */
+            return false;
+        }
+        boolean changed = false;
+        for (int i = 0; i < inputs.size(); i++) {
+            Mutable<ILogicalOperator> inputOpRef = inputs.get(i);
+            AbstractLogicalOperator op = (AbstractLogicalOperator) inputOpRef.getValue();
+            context.addToDontApplySet(this, op);
+            if (op.getOperatorTag() != LogicalOperatorTag.SUBPLAN) {
+                continue;
+            }
+
+            /**
+             * check the order operators whose ordering is not broken before the subplan operator, and then
+             * duplicate them on-top-of the subplan operator
+             */
+            boolean foundTarget = true;
+            boolean orderSensitive = false;
+            Mutable<ILogicalOperator> childRef = op.getInputs().get(0);
+            AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+            while (child.getOperatorTag() != LogicalOperatorTag.ORDER) {
+                context.addToDontApplySet(this, child);
+                if (orderBreakingOps.contains(child.getOperatorTag())) {
+                    foundTarget = false;
+                    break;
+                }
+                if(child.getOperatorTag() == LogicalOperatorTag.GROUP){
+                    foundTarget = false;
+                    break;
+                }
+                if (orderSensitiveOps.contains(child.getOperatorTag())) {
+                    orderSensitive = true;
+                }
+                List<Mutable<ILogicalOperator>> childInputs = child.getInputs();
+                if (childInputs == null || childInputs.size() > 2 || childInputs.size() < 1) {
+                    foundTarget = false;
+                    break;
+                } else {
+                    childRef = childInputs.get(0);
+                    child = (AbstractLogicalOperator) childRef.getValue();
+                }
+            }
+            /** the target order-by operator has not been found. */
+            if (!foundTarget) {
+                return false;
+            }
+
+            /** copy the original order-by operator and insert on-top-of the subplan operator */
+            context.addToDontApplySet(this, child);
+            OrderOperator sourceOrderOp = (OrderOperator) child;
+            for (Pair<IOrder, Mutable<ILogicalExpression>> expr : sourceOrderOp.getOrderExpressions()) {
+                if (!expr.second.getValue().isFunctional()) {
+                    return false;
+                }
+            }
+            List<Pair<IOrder, Mutable<ILogicalExpression>>> orderExprs = deepCopyOrderAndExpression(sourceOrderOp
+                    .getOrderExpressions());
+            OrderOperator newOrderOp = new OrderOperator(orderExprs);
+            context.addToDontApplySet(this, newOrderOp);
+            inputs.set(i, new MutableObject<ILogicalOperator>(newOrderOp));
+            newOrderOp.getInputs().add(inputOpRef);
+            context.computeAndSetTypeEnvironmentForOperator(newOrderOp);
+
+            if (!orderSensitive) {
+                /** remove the original order-by */
+                childRef.setValue(sourceOrderOp.getInputs().get(0).getValue());
+            }
+            changed = true;
+        }
+        return changed;
+    }
+
+    private Mutable<ILogicalExpression> deepCopyExpressionRef(Mutable<ILogicalExpression> oldExpr) {
+        return new MutableObject<ILogicalExpression>(((AbstractLogicalExpression) oldExpr.getValue()).cloneExpression());
+    }
+
+    private List<Pair<IOrder, Mutable<ILogicalExpression>>> deepCopyOrderAndExpression(
+            List<Pair<IOrder, Mutable<ILogicalExpression>>> ordersAndExprs) {
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> newOrdersAndExprs = new ArrayList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+        for (Pair<IOrder, Mutable<ILogicalExpression>> pair : ordersAndExprs)
+            newOrdersAndExprs.add(new Pair<IOrder, Mutable<ILogicalExpression>>(pair.first,
+                    deepCopyExpressionRef(pair.second)));
+        return newOrdersAndExprs;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
new file mode 100644
index 0000000..ebd7da1
--- /dev/null
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -0,0 +1,614 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.rewriter.rules;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.OperatorAnnotations;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.BroadcastPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.ExternalGroupByPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.InMemoryStableSortPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.StableSortPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty.PartitioningType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import edu.uci.ics.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
+import edu.uci.ics.hyracks.dataflow.common.data.partition.range.IRangeMap;
+
+public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
+
+    private static final INodeDomain DEFAULT_DOMAIN = new DefaultNodeGroupDomain("__DEFAULT");
+
+    private PhysicalOptimizationConfig physicalOptimizationConfig;
+
+    @Override
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        // wait for the physical operators to be set first
+        if (op.getPhysicalOperator() == null) {
+            return false;
+        }
+        if (context.checkIfInDontApplySet(this, op)) {
+            return false;
+        }
+
+        List<FunctionalDependency> fds = context.getFDList(op);
+        if (fds != null && !fds.isEmpty()) {
+            return false;
+        }
+        // These are actually logical constraints, so they could be pre-computed
+        // somewhere else, too.
+
+        physicalOptimizationConfig = context.getPhysicalOptimizationConfig();
+        AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Optimizing operator " + op.getPhysicalOperator() + ".\n");
+
+        PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(op, context);
+
+        StructuralPropertiesVector pvector = new StructuralPropertiesVector(new RandomPartitioningProperty(null),
+                new LinkedList<ILocalStructuralProperty>());
+        boolean changed = physOptimizeOp(opRef, pvector, false, context);
+        op.computeDeliveredPhysicalProperties(context);
+        AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + op.getPhysicalOperator() + ": "
+                + op.getDeliveredPhysicalProperties() + "\n");
+
+        context.addToDontApplySet(this, opRef.getValue());
+
+        return changed;
+    }
+
+    private boolean physOptimizePlan(ILogicalPlan plan, IPhysicalPropertiesVector pvector, boolean nestedPlan,
+            IOptimizationContext context) throws AlgebricksException {
+        boolean changed = false;
+        for (Mutable<ILogicalOperator> root : plan.getRoots()) {
+            if (physOptimizeOp(root, pvector, nestedPlan, context)) {
+                changed = true;
+            }
+            AbstractLogicalOperator op = (AbstractLogicalOperator) root.getValue();
+            op.computeDeliveredPhysicalProperties(context);
+            AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + op.getPhysicalOperator()
+                    + ": " + op.getDeliveredPhysicalProperties() + "\n");
+        }
+        return changed;
+    }
+
+    private boolean physOptimizeOp(Mutable<ILogicalOperator> opRef, IPhysicalPropertiesVector required,
+            boolean nestedPlan, IOptimizationContext context) throws AlgebricksException {
+
+        boolean changed = false;
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        optimizeUsingConstraintsAndEquivClasses(op);
+        PhysicalRequirements pr = op.getRequiredPhysicalPropertiesForChildren(required);
+        IPhysicalPropertiesVector[] reqdProperties = null;
+        if (pr != null) {
+            reqdProperties = pr.getRequiredProperties();
+        }
+        boolean opIsRedundantSort = false;
+
+        // compute properties and figure out the domain
+        INodeDomain childrenDomain = null;
+        {
+            int j = 0;
+            for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+                AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+                // recursive call
+                if (physOptimizeOp(childRef, reqdProperties[j], nestedPlan, context)) {
+                    changed = true;
+                }
+                child.computeDeliveredPhysicalProperties(context);
+                IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
+                if (childrenDomain == null) {
+                    childrenDomain = delivered.getPartitioningProperty().getNodeDomain();
+                } else {
+                    INodeDomain dom2 = delivered.getPartitioningProperty().getNodeDomain();
+                    if (!childrenDomain.sameAs(dom2)) {
+                        childrenDomain = DEFAULT_DOMAIN;
+                    }
+                }
+                j++;
+            }
+        }
+
+        if (reqdProperties != null) {
+            for (int k = 0; k < reqdProperties.length; k++) {
+                IPhysicalPropertiesVector pv = reqdProperties[k];
+                IPartitioningProperty pp = pv.getPartitioningProperty();
+                if (pp != null && pp.getNodeDomain() == null) {
+                    pp.setNodeDomain(childrenDomain);
+                }
+            }
+        }
+
+        IPartitioningProperty firstDeliveredPartitioning = null;
+        int i = 0;
+        for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+            AbstractLogicalOperator child = (AbstractLogicalOperator) childRef.getValue();
+            IPhysicalPropertiesVector delivered = child.getDeliveredPhysicalProperties();
+
+            AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Properties delivered by " + child.getPhysicalOperator()
+                    + ": " + delivered + "\n");
+            IPartitioningRequirementsCoordinator prc = pr.getPartitioningCoordinator();
+            Pair<Boolean, IPartitioningProperty> pbpp = prc.coordinateRequirements(
+                    reqdProperties[i].getPartitioningProperty(), firstDeliveredPartitioning, op, context);
+            boolean mayExpandPartitioningProperties = pbpp.first;
+            IPhysicalPropertiesVector rqd = new StructuralPropertiesVector(pbpp.second,
+                    reqdProperties[i].getLocalProperties());
+
+            AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for " + child.getPhysicalOperator()
+                    + ": " + rqd + "\n");
+            IPhysicalPropertiesVector diff = delivered.getUnsatisfiedPropertiesFrom(rqd,
+                    mayExpandPartitioningProperties, context.getEquivalenceClassMap(child), context.getFDList(child));
+
+            if (isRedundantSort(opRef, delivered, diff, context)) {
+                opIsRedundantSort = true;
+            }
+
+            if (diff != null) {
+                changed = true;
+                addEnforcers(op, i, diff, rqd, delivered, childrenDomain, nestedPlan, context);
+
+                AbstractLogicalOperator newChild = ((AbstractLogicalOperator) op.getInputs().get(i).getValue());
+
+                if (newChild != child) {
+                    delivered = newChild.getDeliveredPhysicalProperties();
+                    IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild, rqd,
+                            mayExpandPartitioningProperties, context);
+                    AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties diff: " + newDiff + "\n");
+
+                    if (isRedundantSort(opRef, delivered, newDiff, context)) {
+                        opIsRedundantSort = true;
+                        break;
+                    }
+                }
+
+            }
+            if (firstDeliveredPartitioning == null) {
+                IPartitioningProperty dpp = delivered.getPartitioningProperty();
+                if (dpp.getPartitioningType() == PartitioningType.ORDERED_PARTITIONED
+                        || dpp.getPartitioningType() == PartitioningType.UNORDERED_PARTITIONED) {
+                    firstDeliveredPartitioning = dpp;
+                }
+            }
+
+            i++;
+        }
+
+        if (op.hasNestedPlans()) {
+            AbstractOperatorWithNestedPlans nested = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan p : nested.getNestedPlans()) {
+                if (physOptimizePlan(p, required, true, context)) {
+                    changed = true;
+                }
+            }
+        }
+
+        if (opIsRedundantSort) {
+            if (AlgebricksConfig.DEBUG) {
+                AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Removing redundant SORT operator "
+                        + op.getPhysicalOperator() + "\n");
+                printOp(op);
+            }
+            changed = true;
+            AbstractLogicalOperator nextOp = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+            if (nextOp.getOperatorTag() == LogicalOperatorTag.PROJECT) {
+                nextOp = (AbstractLogicalOperator) nextOp.getInputs().get(0).getValue();
+            }
+            opRef.setValue(nextOp);
+            // Now, transfer annotations from the original sort op. to this one.
+            AbstractLogicalOperator transferTo = nextOp;
+            if (transferTo.getOperatorTag() == LogicalOperatorTag.EXCHANGE) {
+                //
+                // remove duplicate exchange operator
+                transferTo = (AbstractLogicalOperator) transferTo.getInputs().get(0).getValue();
+            }
+            transferTo.getAnnotations().putAll(op.getAnnotations());
+            physOptimizeOp(opRef, required, nestedPlan, context);
+        }
+        return changed;
+    }
+
+    private IPhysicalPropertiesVector newPropertiesDiff(AbstractLogicalOperator newChild,
+            IPhysicalPropertiesVector required, boolean mayExpandPartitioningProperties, IOptimizationContext context)
+            throws AlgebricksException {
+        IPhysicalPropertiesVector newDelivered = newChild.getDeliveredPhysicalProperties();
+
+        Map<LogicalVariable, EquivalenceClass> newChildEqClasses = context.getEquivalenceClassMap(newChild);
+        List<FunctionalDependency> newChildFDs = context.getFDList(newChild);
+        if (newChildEqClasses == null || newChildFDs == null) {
+            FDsAndEquivClassesVisitor fdsVisitor = new FDsAndEquivClassesVisitor();
+            newChild.accept(fdsVisitor, context);
+            newChildEqClasses = context.getEquivalenceClassMap(newChild);
+            newChildFDs = context.getFDList(newChild);
+        }
+        AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Required properties for new op. "
+                + newChild.getPhysicalOperator() + ": " + required + "\n");
+
+        return newDelivered.getUnsatisfiedPropertiesFrom(required, mayExpandPartitioningProperties, newChildEqClasses,
+                newChildFDs);
+    }
+
+    private void optimizeUsingConstraintsAndEquivClasses(AbstractLogicalOperator op) {
+        IPhysicalOperator pOp = op.getPhysicalOperator();
+        switch (pOp.getOperatorTag()) {
+            case HASH_GROUP_BY:
+            case EXTERNAL_GROUP_BY: {
+                GroupByOperator gby = (GroupByOperator) op;
+                ExternalGroupByPOperator hgbyOp = (ExternalGroupByPOperator) pOp;
+                hgbyOp.computeColumnSet(gby.getGroupByList());
+                break;
+            }
+            case PRE_CLUSTERED_GROUP_BY: {
+                GroupByOperator gby = (GroupByOperator) op;
+                PreclusteredGroupByPOperator preSortedGby = (PreclusteredGroupByPOperator) pOp;
+                preSortedGby.setGbyColumns(gby.getGbyVarList());
+                break;
+            }
+            case PRE_SORTED_DISTINCT_BY: {
+                DistinctOperator d = (DistinctOperator) op;
+                PreSortedDistinctByPOperator preSortedDistinct = (PreSortedDistinctByPOperator) pOp;
+                preSortedDistinct.setDistinctByColumns(d.getDistinctByVarList());
+                break;
+            }
+        }
+    }
+
+    private List<OrderColumn> getOrderColumnsFromGroupingProperties(List<ILocalStructuralProperty> reqd,
+            List<ILocalStructuralProperty> dlvd) {
+        List<OrderColumn> returnedProperties = new ArrayList<OrderColumn>();
+        List<LogicalVariable> rqdCols = new ArrayList<LogicalVariable>();
+        List<LogicalVariable> dlvdCols = new ArrayList<LogicalVariable>();
+        for (ILocalStructuralProperty r : reqd) {
+            r.getVariables(rqdCols);
+        }
+        for (ILocalStructuralProperty d : dlvd) {
+            d.getVariables(dlvdCols);
+        }
+
+        int prefix = dlvdCols.size() - 1;
+        for (; prefix >= 0;) {
+            if (!rqdCols.contains(dlvdCols.get(prefix))) {
+                prefix--;
+            } else {
+                break;
+            }
+        }
+
+        LocalOrderProperty orderProp = (LocalOrderProperty) dlvd.get(0);
+        List<OrderColumn> orderColumns = orderProp.getOrderColumns();
+        for (int j = 0; j <= prefix; j++) {
+            returnedProperties.add(new OrderColumn(orderColumns.get(j).getColumn(), orderColumns.get(j).getOrder()));
+        }
+        // maintain other order columns after the required order columns
+        if (returnedProperties.size() != 0) {
+            for (int j = prefix + 1; j < dlvdCols.size(); j++) {
+                OrderColumn oc = orderColumns.get(j);
+                returnedProperties.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
+            }
+        }
+        return returnedProperties;
+    }
+
+    /*
+     * We assume delivered to be already normalized.
+     */
+    private boolean isRedundantSort(Mutable<ILogicalOperator> opRef, IPhysicalPropertiesVector delivered,
+            IPhysicalPropertiesVector diffOfProperties, IOptimizationContext context) {
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+        if (op.getOperatorTag() != LogicalOperatorTag.ORDER
+                || (op.getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.STABLE_SORT && op
+                        .getPhysicalOperator().getOperatorTag() != PhysicalOperatorTag.IN_MEMORY_STABLE_SORT)
+                || delivered.getLocalProperties() == null) {
+            return false;
+        }
+        AbstractStableSortPOperator sortOp = (AbstractStableSortPOperator) op.getPhysicalOperator();
+        sortOp.computeLocalProperties(op);
+        ILocalStructuralProperty orderProp = sortOp.getOrderProperty();
+        return PropertiesUtil.matchLocalProperties(Collections.singletonList(orderProp),
+                delivered.getLocalProperties(), context.getEquivalenceClassMap(op), context.getFDList(op));
+    }
+
+    private void addEnforcers(AbstractLogicalOperator op, int childIndex,
+            IPhysicalPropertiesVector diffPropertiesVector, IPhysicalPropertiesVector required,
+            IPhysicalPropertiesVector deliveredByChild, INodeDomain domain, boolean nestedPlan,
+            IOptimizationContext context) throws AlgebricksException {
+
+        IPartitioningProperty pp = diffPropertiesVector.getPartitioningProperty();
+        if (pp == null || pp.getPartitioningType() == PartitioningType.UNPARTITIONED) {
+            addLocalEnforcers(op, childIndex, diffPropertiesVector.getLocalProperties(), nestedPlan, context);
+            IPhysicalPropertiesVector deliveredByNewChild = ((AbstractLogicalOperator) op.getInputs().get(0).getValue())
+                    .getDeliveredPhysicalProperties();
+            addPartitioningEnforcers(op, childIndex, pp, required, deliveredByNewChild, domain, context);
+        } else {
+            addPartitioningEnforcers(op, childIndex, pp, required, deliveredByChild, domain, context);
+            AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
+            IPhysicalPropertiesVector newDiff = newPropertiesDiff(newChild, required, true, context);
+            AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> New properties diff: " + newDiff + "\n");
+            if (newDiff != null) {
+                addLocalEnforcers(op, childIndex, newDiff.getLocalProperties(), nestedPlan, context);
+            }
+        }
+    }
+
+    private void addLocalEnforcers(AbstractLogicalOperator op, int i, List<ILocalStructuralProperty> localProperties,
+            boolean nestedPlan, IOptimizationContext context) throws AlgebricksException {
+        if (AlgebricksConfig.DEBUG) {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Adding local enforcers for local props = " + localProperties
+                    + "\n");
+        }
+
+        if (localProperties == null || localProperties.isEmpty()) {
+            return;
+        }
+
+        Mutable<ILogicalOperator> topOp = new MutableObject<ILogicalOperator>();
+        topOp.setValue(op.getInputs().get(i).getValue());
+        LinkedList<LocalOrderProperty> oList = new LinkedList<LocalOrderProperty>();
+
+        for (ILocalStructuralProperty prop : localProperties) {
+            switch (prop.getPropertyType()) {
+                case LOCAL_ORDER_PROPERTY: {
+                    oList.add((LocalOrderProperty) prop);
+                    break;
+                }
+                case LOCAL_GROUPING_PROPERTY: {
+                    LocalGroupingProperty g = (LocalGroupingProperty) prop;
+                    Collection<LogicalVariable> vars = (g.getPreferredOrderEnforcer() != null) ? g
+                            .getPreferredOrderEnforcer() : g.getColumnSet();
+                    List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+                    for (LogicalVariable v : vars) {
+                        OrderColumn oc = new OrderColumn(v, OrderKind.ASC);
+                        orderColumns.add(oc);
+                    }
+                    LocalOrderProperty lop = new LocalOrderProperty(orderColumns);
+                    oList.add(lop);
+                    break;
+                }
+                default: {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+        if (!oList.isEmpty()) {
+            topOp = enforceOrderProperties(oList, topOp, nestedPlan, context);
+        }
+
+        op.getInputs().set(i, topOp);
+        OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull((AbstractLogicalOperator) topOp.getValue(), context);
+        printOp((AbstractLogicalOperator) topOp.getValue());
+    }
+
+    private Mutable<ILogicalOperator> enforceOrderProperties(List<LocalOrderProperty> oList,
+            Mutable<ILogicalOperator> topOp, boolean isMicroOp, IOptimizationContext context)
+            throws AlgebricksException {
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+        for (LocalOrderProperty orderProperty : oList) {
+            for (OrderColumn oc : orderProperty.getOrderColumns()) {
+                IOrder ordType = (oc.getOrder() == OrderKind.ASC) ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER;
+                Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<IOrder, Mutable<ILogicalExpression>>(ordType,
+                        new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oc.getColumn())));
+                oe.add(pair);
+            }
+        }
+        OrderOperator oo = new OrderOperator(oe);
+        oo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.LOCAL);
+        if (isMicroOp) {
+            oo.setPhysicalOperator(new InMemoryStableSortPOperator());
+        } else {
+            oo.setPhysicalOperator(new StableSortPOperator(physicalOptimizationConfig.getMaxFramesExternalSort()));
+        }
+        oo.getInputs().add(topOp);
+        context.computeAndSetTypeEnvironmentForOperator(oo);
+        if (AlgebricksConfig.DEBUG) {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Added sort enforcer " + oo.getPhysicalOperator() + ".\n");
+        }
+        return new MutableObject<ILogicalOperator>(oo);
+    }
+
+    private void addPartitioningEnforcers(ILogicalOperator op, int i, IPartitioningProperty pp,
+            IPhysicalPropertiesVector required, IPhysicalPropertiesVector deliveredByChild, INodeDomain domain,
+            IOptimizationContext context) throws AlgebricksException {
+        if (pp != null) {
+            IPhysicalOperator pop;
+            switch (pp.getPartitioningType()) {
+                case UNPARTITIONED: {
+                    List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild);
+                    if (ordCols == null || ordCols.size() == 0) {
+                        pop = new RandomMergeExchangePOperator();
+                    } else {
+                        if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
+                            IRangeMap rangeMap = (IRangeMap) op.getAnnotations().get(
+                                    OperatorAnnotations.USE_RANGE_CONNECTOR);
+                            pop = new RangePartitionMergePOperator(ordCols, domain, rangeMap);
+                        } else {
+                            OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
+                            sortColumns = ordCols.toArray(sortColumns);
+                            pop = new SortMergeExchangePOperator(sortColumns);
+                        }
+                    }
+                    break;
+                }
+                case UNORDERED_PARTITIONED: {
+                    List<LogicalVariable> varList = new ArrayList<LogicalVariable>(
+                            ((UnorderedPartitionedProperty) pp).getColumnSet());
+                    List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
+                    List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
+                    boolean propWasSet = false;
+                    pop = null;
+                    if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
+                        AbstractLogicalOperator c = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
+                        Map<LogicalVariable, EquivalenceClass> ecs = context.getEquivalenceClassMap(c);
+                        List<FunctionalDependency> fds = context.getFDList(c);
+                        if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
+                            List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals,
+                                    cldLocals);
+                            pop = new HashPartitionMergeExchangePOperator(orderColumns, varList, domain);
+                            propWasSet = true;
+                        }
+                    }
+                    if (!propWasSet) {
+                        pop = new HashPartitionExchangePOperator(varList, domain);
+                    }
+                    break;
+                }
+                case ORDERED_PARTITIONED: {
+                    pop = new RangePartitionPOperator(((OrderedPartitionedProperty) pp).getOrderColumns(), domain, null);
+                    break;
+                }
+                case BROADCAST: {
+                    pop = new BroadcastPOperator(domain);
+                    break;
+                }
+                case RANDOM: {
+                    RandomPartitioningProperty rpp = (RandomPartitioningProperty) pp;
+                    INodeDomain nd = rpp.getNodeDomain();
+                    pop = new RandomPartitionPOperator(nd);
+                    break;
+                }
+                default: {
+                    throw new NotImplementedException("Enforcer for " + pp.getPartitioningType()
+                            + " partitioning type has not been implemented.");
+                }
+            }
+            Mutable<ILogicalOperator> ci = op.getInputs().get(i);
+            ExchangeOperator exchg = new ExchangeOperator();
+            exchg.setPhysicalOperator(pop);
+            setNewOp(ci, exchg, context);
+            exchg.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+            OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull(exchg, context);
+            context.computeAndSetTypeEnvironmentForOperator(exchg);
+            if (AlgebricksConfig.DEBUG) {
+                AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Added partitioning enforcer "
+                        + exchg.getPhysicalOperator() + ".\n");
+                printOp((AbstractLogicalOperator) op);
+            }
+        }
+    }
+
+    private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) {
+        for (ILocalStructuralProperty lsp : cldLocals) {
+            if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {
+                return false;
+            }
+        }
+        return !cldLocals.isEmpty();
+    }
+
+    private void printOp(AbstractLogicalOperator op) throws AlgebricksException {
+        StringBuilder sb = new StringBuilder();
+        LogicalOperatorPrettyPrintVisitor pvisitor = new LogicalOperatorPrettyPrintVisitor();
+        PlanPrettyPrinter.printOperator(op, sb, pvisitor, 0);
+        AlgebricksConfig.ALGEBRICKS_LOGGER.fine(sb.toString());
+    }
+
+    private List<OrderColumn> computeOrderColumns(IPhysicalPropertiesVector pv) {
+        List<OrderColumn> ordCols = new ArrayList<OrderColumn>();
+        List<ILocalStructuralProperty> localProps = pv.getLocalProperties();
+        if (localProps == null || localProps.size() == 0) {
+            return null;
+        } else {
+            for (ILocalStructuralProperty p : localProps) {
+                if (p.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
+                    LocalOrderProperty lop = (LocalOrderProperty) p;
+                    ordCols.addAll(lop.getOrderColumns());
+                } else {
+                    return null;
+                }
+            }
+            return ordCols;
+        }
+
+    }
+
+    private void setNewOp(Mutable<ILogicalOperator> opRef, AbstractLogicalOperator newOp, IOptimizationContext context)
+            throws AlgebricksException {
+        ILogicalOperator oldOp = opRef.getValue();
+        opRef.setValue(newOp);
+        newOp.getInputs().add(new MutableObject<ILogicalOperator>(oldOp));
+        newOp.recomputeSchema();
+        newOp.computeDeliveredPhysicalProperties(context);
+        context.computeAndSetTypeEnvironmentForOperator(newOp);
+        AlgebricksConfig.ALGEBRICKS_LOGGER.finest(">>>> Structural properties for " + newOp.getPhysicalOperator()
+                + ": " + newOp.getDeliveredPhysicalProperties() + "\n");
+
+        PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(newOp, context);
+    }
+
+}