You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:41:41 UTC

[28/51] [partial] incubator-asterixdb-hyracks git commit: Change folder structure for Java repackage

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
new file mode 100644
index 0000000..695078a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -0,0 +1,469 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class SubstituteVariableVisitor implements ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> {
+
+    private final boolean goThroughNts;
+    private final ITypingContext ctx;
+
+    public SubstituteVariableVisitor(boolean goThroughNts, ITypingContext ctx) {
+        this.goThroughNts = goThroughNts;
+        this.ctx = ctx;
+    }
+
+    @Override
+    public Void visitAggregateOperator(AggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        List<LogicalVariable> variables = op.getVariables();
+        int n = variables.size();
+        for (int i = 0; i < n; i++) {
+            if (variables.get(i).equals(pair.first)) {
+                variables.set(i, pair.second);
+            } else {
+                op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
+            }
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitAssignOperator(AssignOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        List<LogicalVariable> variables = op.getVariables();
+        int n = variables.size();
+        for (int i = 0; i < n; i++) {
+            if (variables.get(i).equals(pair.first)) {
+                variables.set(i, pair.second);
+            } else {
+                op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
+            }
+        }
+        // Substitute variables stored in ordering property
+        if (op.getExplicitOrderingProperty() != null) {
+            List<OrderColumn> orderColumns = op.getExplicitOrderingProperty().getOrderColumns();
+            for (int i = 0; i < orderColumns.size(); i++) {
+                OrderColumn oc = orderColumns.get(i);
+                if (oc.getColumn().equals(pair.first)) {
+                    orderColumns.set(i, new OrderColumn(pair.second, oc.getOrder()));
+                }
+            }
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitDataScanOperator(DataSourceScanOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        List<LogicalVariable> variables = op.getVariables();
+        for (int i = 0; i < variables.size(); i++) {
+            if (variables.get(i) == pair.first) {
+                variables.set(i, pair.second);
+                return null;
+            }
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitDistinctOperator(DistinctOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (Mutable<ILogicalExpression> eRef : op.getExpressions()) {
+            eRef.getValue().substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Pair<LogicalVariable, LogicalVariable> pair) {
+        // does not use any variable
+        return null;
+    }
+
+    @Override
+    public Void visitExchangeOperator(ExchangeOperator op, Pair<LogicalVariable, LogicalVariable> pair) {
+        // does not use any variable
+        return null;
+    }
+
+    @Override
+    public Void visitGroupByOperator(GroupByOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        subst(pair.first, pair.second, op.getGroupByList());
+        subst(pair.first, pair.second, op.getDecorList());
+        for (ILogicalPlan p : op.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), pair.first,
+                        pair.second, goThroughNts, ctx);
+            }
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitInnerJoinOperator(InnerJoinOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        op.getCondition().getValue().substituteVar(pair.first, pair.second);
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        op.getCondition().getValue().substituteVar(pair.first, pair.second);
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitLimitOperator(LimitOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        op.getMaxObjects().getValue().substituteVar(pair.first, pair.second);
+        ILogicalExpression offset = op.getOffset().getValue();
+        if (offset != null) {
+            offset.substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitOrderOperator(OrderOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) {
+            oe.second.getValue().substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (Mutable<ILogicalExpression> e : op.getExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitProjectOperator(ProjectOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        List<LogicalVariable> usedVariables = op.getVariables();
+        int n = usedVariables.size();
+        for (int i = 0; i < n; i++) {
+            LogicalVariable v = usedVariables.get(i);
+            if (v.equals(pair.first)) {
+                usedVariables.set(i, pair.second);
+            }
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitRunningAggregateOperator(RunningAggregateOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        List<LogicalVariable> variables = op.getVariables();
+        int n = variables.size();
+        for (int i = 0; i < n; i++) {
+            if (variables.get(i).equals(pair.first)) {
+                variables.set(i, pair.second);
+            } else {
+                op.getExpressions().get(i).getValue().substituteVar(pair.first, pair.second);
+            }
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitScriptOperator(ScriptOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        substInArray(op.getInputVariables(), pair.first, pair.second);
+        substInArray(op.getOutputVariables(), pair.first, pair.second);
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitSelectOperator(SelectOperator op, Pair<LogicalVariable, LogicalVariable> pair) {
+        op.getCondition().getValue().substituteVar(pair.first, pair.second);
+        return null;
+    }
+
+    @Override
+    public Void visitSubplanOperator(SubplanOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (ILogicalPlan p : op.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                OperatorManipulationUtil.substituteVarRec((AbstractLogicalOperator) r.getValue(), pair.first,
+                        pair.second, goThroughNts, ctx);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitUnionOperator(UnionAllOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> varMap = op.getVariableMappings();
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : varMap) {
+            if (t.first.equals(pair.first)) {
+                t.first = pair.second;
+            }
+            if (t.second.equals(pair.first)) {
+                t.second = pair.second;
+            }
+            if (t.third.equals(pair.first)) {
+                t.third = pair.second;
+            }
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestMapOperator(UnnestMapOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        List<LogicalVariable> variables = op.getVariables();
+        for (int i = 0; i < variables.size(); i++) {
+            if (variables.get(i) == pair.first) {
+                variables.set(i, pair.second);
+                return null;
+            }
+        }
+        op.getExpressionRef().getValue().substituteVar(pair.first, pair.second);
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestOperator(UnnestOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        List<LogicalVariable> variables = op.getVariables();
+        for (int i = 0; i < variables.size(); i++) {
+            if (variables.get(i) == pair.first) {
+                variables.set(i, pair.second);
+                return null;
+            }
+        }
+        op.getExpressionRef().getValue().substituteVar(pair.first, pair.second);
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteOperator(WriteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (Mutable<ILogicalExpression> e : op.getExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (Mutable<ILogicalExpression> e : op.getExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteResultOperator(WriteResultOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
+        for (Mutable<ILogicalExpression> e : op.getKeyExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    private void subst(LogicalVariable v1, LogicalVariable v2,
+            List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> varExprPairList) {
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : varExprPairList) {
+            if (ve.first != null && ve.first.equals(v1)) {
+                ve.first = v2;
+                return;
+            }
+            ve.second.getValue().substituteVar(v1, v2);
+        }
+    }
+
+    private void substInArray(ArrayList<LogicalVariable> varArray, LogicalVariable v1, LogicalVariable v2) {
+        for (int i = 0; i < varArray.size(); i++) {
+            LogicalVariable v = varArray.get(i);
+            if (v == v1) {
+                varArray.set(i, v2);
+            }
+        }
+    }
+
+    @Override
+    public Void visitReplicateOperator(ReplicateOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+            throws AlgebricksException {
+        op.substituteVar(arg.first, arg.second);
+        return null;
+    }
+
+    @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitInsertDeleteOperator(InsertDeleteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        op.getPayloadExpression().getValue().substituteVar(pair.first, pair.second);
+        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+            e.getValue().substituteVar(pair.first, pair.second);
+        }
+        substVarTypes(op, pair);
+        return null;
+    }
+
+    @Override
+    public Void visitSinkOperator(SinkOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        return null;
+    }
+
+    private void substVarTypes(ILogicalOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+            throws AlgebricksException {
+        if (ctx == null) {
+            return;
+        }
+        IVariableTypeEnvironment env = ctx.getOutputTypeEnvironment(op);
+        env.substituteProducedVariable(arg.first, arg.second);
+    }
+
+    @Override
+    public Void visitExtensionOperator(ExtensionOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+            throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitExternalDataLookupOperator(ExternalDataLookupOperator op,
+            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
+        List<LogicalVariable> variables = op.getVariables();
+        for (int i = 0; i < variables.size(); i++) {
+            if (variables.get(i) == pair.first) {
+                variables.set(i, pair.second);
+                return null;
+            }
+        }
+        op.getExpressionRef().getValue().substituteVar(pair.first, pair.second);
+        substVarTypes(op, pair);
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
new file mode 100644
index 0000000..8169ad0
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -0,0 +1,407 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.HashPartitionMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.RangePartitionPOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void> {
+
+    private Collection<LogicalVariable> usedVariables;
+
+    public UsedVariableVisitor(Collection<LogicalVariable> usedVariables) {
+        this.usedVariables = usedVariables;
+    }
+
+    @Override
+    public Void visitAggregateOperator(AggregateOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
+            exprRef.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitAssignOperator(AssignOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
+            exprRef.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) {
+        if (op.getAdditionalFilteringExpressions() != null) {
+            for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
+                e.getValue().getUsedVariables(usedVariables);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitDistinctOperator(DistinctOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> eRef : op.getExpressions()) {
+            eRef.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, Void arg) {
+        // does not use any variable
+        return null;
+    }
+
+    @Override
+    public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws AlgebricksException {
+        // Used variables depend on the physical operator.
+        if (op.getPhysicalOperator() != null) {
+            IPhysicalOperator physOp = op.getPhysicalOperator();
+            switch (physOp.getOperatorTag()) {
+                case BROADCAST_EXCHANGE:
+                case ONE_TO_ONE_EXCHANGE:
+                case RANDOM_MERGE_EXCHANGE: {
+                    // No variables used.
+                    break;
+                }
+                case HASH_PARTITION_EXCHANGE: {
+                    HashPartitionExchangePOperator concreteOp = (HashPartitionExchangePOperator) physOp;
+                    usedVariables.addAll(concreteOp.getHashFields());
+                    break;
+                }
+                case HASH_PARTITION_MERGE_EXCHANGE: {
+                    HashPartitionMergeExchangePOperator concreteOp = (HashPartitionMergeExchangePOperator) physOp;
+                    usedVariables.addAll(concreteOp.getPartitionFields());
+                    for (OrderColumn orderCol : concreteOp.getOrderColumns()) {
+                        usedVariables.add(orderCol.getColumn());
+                    }
+                    break;
+                }
+                case SORT_MERGE_EXCHANGE: {
+                    SortMergeExchangePOperator concreteOp = (SortMergeExchangePOperator) physOp;
+                    for (OrderColumn orderCol : concreteOp.getSortColumns()) {
+                        usedVariables.add(orderCol.getColumn());
+                    }
+                    break;
+                }
+                case RANGE_PARTITION_EXCHANGE: {
+                    RangePartitionPOperator concreteOp = (RangePartitionPOperator) physOp;
+                    for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+                        usedVariables.add(partCol.getColumn());
+                    }
+                    break;
+                }
+                case RANGE_PARTITION_MERGE_EXCHANGE: {
+                    RangePartitionMergePOperator concreteOp = (RangePartitionMergePOperator) physOp;
+                    for (OrderColumn partCol : concreteOp.getPartitioningFields()) {
+                        usedVariables.add(partCol.getColumn());
+                    }
+                    break;
+                }
+                default: {
+                    throw new AlgebricksException("Unhandled physical operator tag '" + physOp.getOperatorTag() + "'.");
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitGroupByOperator(GroupByOperator op, Void arg) throws AlgebricksException {
+        for (ILogicalPlan p : op.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                VariableUtilities.getUsedVariablesInDescendantsAndSelf(r.getValue(), usedVariables);
+            }
+        }
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : op.getGroupByList()) {
+            g.second.getValue().getUsedVariables(usedVariables);
+        }
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> g : op.getDecorList()) {
+            g.second.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) {
+        op.getCondition().getValue().getUsedVariables(usedVariables);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) {
+        op.getCondition().getValue().getUsedVariables(usedVariables);
+        return null;
+    }
+
+    @Override
+    public Void visitLimitOperator(LimitOperator op, Void arg) {
+        op.getMaxObjects().getValue().getUsedVariables(usedVariables);
+        ILogicalExpression offsetExpr = op.getOffset().getValue();
+        if (offsetExpr != null) {
+            offsetExpr.getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, Void arg) {
+        // does not use any variable
+        return null;
+    }
+
+    @Override
+    public Void visitOrderOperator(OrderOperator op, Void arg) {
+        for (Pair<IOrder, Mutable<ILogicalExpression>> oe : op.getOrderExpressions()) {
+            oe.second.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> e : op.getExpressions()) {
+            e.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitProjectOperator(ProjectOperator op, Void arg) {
+        List<LogicalVariable> parameterVariables = op.getVariables();
+        for (LogicalVariable v : parameterVariables) {
+            if (!usedVariables.contains(v)) {
+                usedVariables.add(v);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitRunningAggregateOperator(RunningAggregateOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> exprRef : op.getExpressions()) {
+            exprRef.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitScriptOperator(ScriptOperator op, Void arg) {
+        List<LogicalVariable> parameterVariables = op.getInputVariables();
+        for (LogicalVariable v : parameterVariables) {
+            if (!usedVariables.contains(v)) {
+                usedVariables.add(v);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitSelectOperator(SelectOperator op, Void arg) {
+        op.getCondition().getValue().getUsedVariables(usedVariables);
+        return null;
+    }
+
+    @Override
+    public Void visitSubplanOperator(SubplanOperator op, Void arg) throws AlgebricksException {
+        for (ILogicalPlan p : op.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                VariableUtilities.getUsedVariablesInDescendantsAndSelf(r.getValue(), usedVariables);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitUnionOperator(UnionAllOperator op, Void arg) {
+        for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> m : op.getVariableMappings()) {
+            if (!usedVariables.contains(m.first)) {
+                usedVariables.add(m.first);
+            }
+            if (!usedVariables.contains(m.second)) {
+                usedVariables.add(m.second);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) {
+        op.getExpressionRef().getValue().getUsedVariables(usedVariables);
+        if (op.getAdditionalFilteringExpressions() != null) {
+            for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
+                e.getValue().getUsedVariables(usedVariables);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestOperator(UnnestOperator op, Void arg) {
+        op.getExpressionRef().getValue().getUsedVariables(usedVariables);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteOperator(WriteOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
+            expr.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
+            expr.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitWriteResultOperator(WriteResultOperator op, Void arg) {
+        op.getPayloadExpression().getValue().getUsedVariables(usedVariables);
+        for (Mutable<ILogicalExpression> e : op.getKeyExpressions()) {
+            e.getValue().getUsedVariables(usedVariables);
+        }
+        if (op.getAdditionalFilteringExpressions() != null) {
+            for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
+                e.getValue().getUsedVariables(usedVariables);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitInsertDeleteOperator(InsertDeleteOperator op, Void arg) {
+        op.getPayloadExpression().getValue().getUsedVariables(usedVariables);
+        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+            e.getValue().getUsedVariables(usedVariables);
+        }
+        if (op.getAdditionalFilteringExpressions() != null) {
+            for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
+                e.getValue().getUsedVariables(usedVariables);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+            e.getValue().getUsedVariables(usedVariables);
+        }
+        for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+            e.getValue().getUsedVariables(usedVariables);
+        }
+        if (op.getAdditionalFilteringExpressions() != null) {
+            for (Mutable<ILogicalExpression> e : op.getAdditionalFilteringExpressions()) {
+                e.getValue().getUsedVariables(usedVariables);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Void arg) {
+        for (Mutable<ILogicalExpression> e : op.getPrimaryKeyExpressions()) {
+            e.getValue().getUsedVariables(usedVariables);
+        }
+        for (Mutable<ILogicalExpression> e : op.getSecondaryKeyExpressions()) {
+            e.getValue().getUsedVariables(usedVariables);
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitSinkOperator(SinkOperator op, Void arg) {
+        return null;
+    }
+
+    @Override
+    public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitExtensionOperator(ExtensionOperator op, Void arg) throws AlgebricksException {
+        op.getDelegate().getUsedVariables(usedVariables);
+        return null;
+    }
+
+    @Override
+    public Void visitExternalDataLookupOperator(ExternalDataLookupOperator op, Void arg) throws AlgebricksException {
+        op.getExpressionRef().getValue().getUsedVariables(usedVariables);
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
new file mode 100644
index 0000000..f3cf0a4
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class VariableUtilities {
+
+    public static void getUsedVariables(ILogicalOperator op, Collection<LogicalVariable> usedVariables)
+            throws AlgebricksException {
+        ILogicalOperatorVisitor<Void, Void> visitor = new UsedVariableVisitor(usedVariables);
+        op.accept(visitor, null);
+    }
+
+    public static void getProducedVariables(ILogicalOperator op, Collection<LogicalVariable> producedVariables)
+            throws AlgebricksException {
+        ILogicalOperatorVisitor<Void, Void> visitor = new ProducedVariableVisitor(producedVariables);
+        op.accept(visitor, null);
+    }
+
+    public static void getLiveVariables(ILogicalOperator op, Collection<LogicalVariable> schemaVariables)
+            throws AlgebricksException {
+        ILogicalOperatorVisitor<Void, Void> visitor = new SchemaVariableVisitor(schemaVariables);
+        op.accept(visitor, null);
+    }
+
+    public static void getUsedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
+            throws AlgebricksException {
+        // DFS traversal
+        VariableUtilities.getUsedVariables(op, vars);
+        for (Mutable<ILogicalOperator> c : op.getInputs()) {
+            getUsedVariablesInDescendantsAndSelf(c.getValue(), vars);
+        }
+    }
+
+    public static void getProducedVariablesInDescendantsAndSelf(ILogicalOperator op, Collection<LogicalVariable> vars)
+            throws AlgebricksException {
+        // DFS traversal
+        VariableUtilities.getProducedVariables(op, vars);
+        for (Mutable<ILogicalOperator> c : op.getInputs()) {
+            getProducedVariablesInDescendantsAndSelf(c.getValue(), vars);
+        }
+    }
+
+    public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
+            ITypingContext ctx) throws AlgebricksException {
+        substituteVariables(op, v1, v2, true, ctx);
+    }
+
+    public static void substituteVariablesInDescendantsAndSelf(ILogicalOperator op, LogicalVariable v1,
+            LogicalVariable v2, ITypingContext ctx) throws AlgebricksException {
+        for (Mutable<ILogicalOperator> childOp : op.getInputs()) {
+            substituteVariablesInDescendantsAndSelf(childOp.getValue(), v1, v2, ctx);
+        }
+        substituteVariables(op, v1, v2, true, ctx);
+    }
+
+    public static void substituteVariables(ILogicalOperator op, LogicalVariable v1, LogicalVariable v2,
+            boolean goThroughNts, ITypingContext ctx) throws AlgebricksException {
+        ILogicalOperatorVisitor<Void, Pair<LogicalVariable, LogicalVariable>> visitor = new SubstituteVariableVisitor(
+                goThroughNts, ctx);
+        op.accept(visitor, new Pair<LogicalVariable, LogicalVariable>(v1, v2));
+    }
+
+    public static <T> boolean varListEqualUnordered(List<T> var, List<T> varArg) {
+        Set<T> varSet = new HashSet<T>();
+        Set<T> varArgSet = new HashSet<T>();
+        varSet.addAll(var);
+        varArgSet.addAll(varArg);
+        return varSet.equals(varArgSet);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
new file mode 100644
index 0000000..42d964d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractExchangePOperator.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder.TargetConstraint;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.job.IConnectorDescriptorRegistry;
+
+public abstract class AbstractExchangePOperator extends AbstractPhysicalOperator {
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        Pair<IConnectorDescriptor, TargetConstraint> connPair = createConnectorDescriptor(builder.getJobSpec(), op,
+                opSchema, context);
+        builder.contributeConnectorWithTargetConstraint(op, connPair.first, connPair.second);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+
+    public abstract Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(
+            IConnectorDescriptorRegistry spec, ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context)
+            throws AlgebricksException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
new file mode 100644
index 0000000..764159d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+
+public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
+
+    protected List<LogicalVariable> keysLeftBranch;
+    protected List<LogicalVariable> keysRightBranch;
+
+    public AbstractHashJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType,
+            List<LogicalVariable> sideLeftOfEqualities, List<LogicalVariable> sideRightOfEqualities) {
+        super(kind, partitioningType);
+        this.keysLeftBranch = sideLeftOfEqualities;
+        this.keysRightBranch = sideRightOfEqualities;
+    }
+
+    public List<LogicalVariable> getKeysLeftBranch() {
+        return keysLeftBranch;
+    }
+
+    public List<LogicalVariable> getKeysRightBranch() {
+        return keysRightBranch;
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context)
+            throws AlgebricksException {
+        IPartitioningProperty pp;
+        AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+            AbstractLogicalOperator op0 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+            IPhysicalPropertiesVector pv0 = op0.getPhysicalOperator().getDeliveredProperties();
+            AbstractLogicalOperator op1 = (AbstractLogicalOperator) op.getInputs().get(1).getValue();
+            IPhysicalPropertiesVector pv1 = op1.getPhysicalOperator().getDeliveredProperties();
+
+            if (pv0 == null || pv1 == null) {
+                pp = null;
+            } else {
+                pp = pv0.getPartitioningProperty();
+            }
+        } else {
+            pp = IPartitioningProperty.UNPARTITIONED;
+        }
+        this.deliveredProperties = new StructuralPropertiesVector(pp, deliveredLocalProperties(iop, context));
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
+            IPhysicalPropertiesVector reqdByParent) {
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[2];
+        // In a cost-based optimizer, we would also try to propagate the
+        // parent's partitioning requirements.
+        AbstractLogicalOperator op = (AbstractLogicalOperator) iop;
+
+        IPartitioningProperty pp1 = null;
+        IPartitioningProperty pp2 = null;
+        if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+            switch (partitioningType) {
+                case PAIRWISE: {
+                    pp1 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysLeftBranch), null);
+                    pp2 = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(keysRightBranch), null);
+                    break;
+                }
+                case BROADCAST: {
+                    pp2 = new BroadcastPartitioningProperty(null);
+                    break;
+                }
+                default: {
+                    throw new IllegalStateException();
+                }
+            }
+        }
+
+        pv[0] = new StructuralPropertiesVector(pp1, null);
+        pv[1] = new StructuralPropertiesVector(pp2, null);
+
+        IPartitioningRequirementsCoordinator prc;
+        switch (kind) {
+            case INNER: {
+                prc = IPartitioningRequirementsCoordinator.EQCLASS_PARTITIONING_COORDINATOR;
+                break;
+            }
+            case LEFT_OUTER: {
+                prc = new IPartitioningRequirementsCoordinator() {
+
+                    @Override
+                    public Pair<Boolean, IPartitioningProperty> coordinateRequirements(
+                            IPartitioningProperty requirements, IPartitioningProperty firstDeliveredPartitioning,
+                            ILogicalOperator op, IOptimizationContext context) throws AlgebricksException {
+                        if (firstDeliveredPartitioning != null
+                                && firstDeliveredPartitioning.getPartitioningType() == requirements
+                                        .getPartitioningType()) {
+                            switch (requirements.getPartitioningType()) {
+                                case UNORDERED_PARTITIONED: {
+                                    UnorderedPartitionedProperty upp1 = (UnorderedPartitionedProperty) firstDeliveredPartitioning;
+                                    Set<LogicalVariable> set1 = upp1.getColumnSet();
+                                    UnorderedPartitionedProperty uppreq = (UnorderedPartitionedProperty) requirements;
+                                    Set<LogicalVariable> modifuppreq = new ListSet<LogicalVariable>();
+                                    Map<LogicalVariable, EquivalenceClass> eqmap = context.getEquivalenceClassMap(op);
+                                    Set<LogicalVariable> covered = new ListSet<LogicalVariable>();
+                                    for (LogicalVariable r : uppreq.getColumnSet()) {
+                                        EquivalenceClass ecSnd = eqmap.get(r);
+                                        boolean found = false;
+                                        int j = 0;
+                                        for (LogicalVariable rvar : keysRightBranch) {
+                                            if (rvar == r || ecSnd != null && eqmap.get(rvar) == ecSnd) {
+                                                found = true;
+                                                break;
+                                            }
+                                            j++;
+                                        }
+                                        if (!found) {
+                                            throw new IllegalStateException("Did not find a variable equivalent to "
+                                                    + r + " among " + keysRightBranch);
+                                        }
+                                        LogicalVariable v2 = keysLeftBranch.get(j);
+                                        EquivalenceClass ecFst = eqmap.get(v2);
+                                        for (LogicalVariable vset1 : set1) {
+                                            if (vset1 == v2 || ecFst != null && eqmap.get(vset1) == ecFst) {
+                                                covered.add(vset1);
+                                                modifuppreq.add(r);
+                                                break;
+                                            }
+                                        }
+                                    }
+                                    if (!covered.equals(set1)) {
+                                        throw new AlgebricksException("Could not modify " + requirements
+                                                + " to agree with partitioning property " + firstDeliveredPartitioning
+                                                + " delivered by previous input operator.");
+                                    }
+                                    UnorderedPartitionedProperty upp2 = new UnorderedPartitionedProperty(modifuppreq,
+                                            requirements.getNodeDomain());
+                                    return new Pair<Boolean, IPartitioningProperty>(false, upp2);
+                                }
+                                case ORDERED_PARTITIONED: {
+                                    throw new NotImplementedException();
+                                }
+                            }
+                        }
+                        return new Pair<Boolean, IPartitioningProperty>(true, requirements);
+                    }
+                };
+                break;
+            }
+            default: {
+                throw new IllegalStateException();
+            }
+        }
+
+        return new PhysicalRequirements(pv, prc);
+    }
+
+    protected abstract List<ILocalStructuralProperty> deliveredLocalProperties(ILogicalOperator op,
+            IOptimizationContext context) throws AlgebricksException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
new file mode 100644
index 0000000..fcc04ab
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractJoinPOperator.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+
+public abstract class AbstractJoinPOperator extends AbstractPhysicalOperator {
+
+    public enum JoinPartitioningType {
+        PAIRWISE, BROADCAST
+    }
+
+    protected final JoinKind kind;
+    protected final JoinPartitioningType partitioningType;
+
+    public AbstractJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType) {
+        this.kind = kind;
+        this.partitioningType = partitioningType;
+    }
+
+    public JoinKind getKind() {
+        return kind;
+    }
+
+    public JoinPartitioningType getPartitioningType() {
+        return partitioningType;
+    }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 1, 0 };
+        int[] outputDependencyLabels = new int[] { 1 };
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
new file mode 100644
index 0000000..66e7b98
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPhysicalOperator.java
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.PlanCompiler;
+import edu.uci.ics.hyracks.algebricks.runtime.base.AlgebricksPipeline;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public abstract class AbstractPhysicalOperator implements IPhysicalOperator {
+
+    protected IPhysicalPropertiesVector deliveredProperties;
+    private boolean disableJobGenBelow = false;
+    private Object hostQueryContext;
+
+    @Override
+    public final IPhysicalPropertiesVector getDeliveredProperties() {
+        return deliveredProperties;
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString();
+    }
+
+    public void setHostQueryContext(Object context) {
+        this.hostQueryContext = context;
+    }
+
+    public Object getHostQueryContext() {
+        return hostQueryContext;
+    }
+
+    protected PhysicalRequirements emptyUnaryRequirements() {
+        StructuralPropertiesVector[] req = new StructuralPropertiesVector[] { StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR };
+        return new PhysicalRequirements(req, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    protected PhysicalRequirements emptyUnaryRequirements(int numberOfChildren) {
+        StructuralPropertiesVector[] req = new StructuralPropertiesVector[numberOfChildren];
+        for (int i = 0; i < numberOfChildren; i++) {
+            req[i] = StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+        }
+        return new PhysicalRequirements(req, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    @Override
+    public void disableJobGenBelowMe() {
+        this.disableJobGenBelow = true;
+    }
+
+    @Override
+    public boolean isJobGenDisabledBelowMe() {
+        return disableJobGenBelow;
+    }
+
+    /**
+     * @return labels (0 or 1) for each input and output indicating the dependency between them.
+     *         The edges labeled as 1 must wait for the edges with label 0.
+     */
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[op.getInputs().size()]; // filled with 0's
+        int[] outputDependencyLabels = new int[] { 0 };
+        return new Pair<int[], int[]>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    protected void contributeOpDesc(IHyracksJobBuilder builder, AbstractLogicalOperator op, IOperatorDescriptor opDesc) {
+        if (op.getExecutionMode() == ExecutionMode.UNPARTITIONED) {
+            AlgebricksPartitionConstraint apc = new AlgebricksCountPartitionConstraint(1);
+            builder.contributeAlgebricksPartitionConstraint(opDesc, apc);
+        }
+        builder.contributeHyracksOperator(op, opDesc);
+    }
+
+    protected AlgebricksPipeline[] compileSubplans(IOperatorSchema outerPlanSchema,
+            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, JobGenContext context)
+            throws AlgebricksException {
+        AlgebricksPipeline[] subplans = new AlgebricksPipeline[npOp.getNestedPlans().size()];
+        PlanCompiler pc = new PlanCompiler(context);
+        int i = 0;
+        for (ILogicalPlan p : npOp.getNestedPlans()) {
+            subplans[i++] = buildPipelineWithProjection(p, outerPlanSchema, npOp, opSchema, pc);
+        }
+        return subplans;
+    }
+
+    private AlgebricksPipeline buildPipelineWithProjection(ILogicalPlan p, IOperatorSchema outerPlanSchema,
+            AbstractOperatorWithNestedPlans npOp, IOperatorSchema opSchema, PlanCompiler pc) throws AlgebricksException {
+        if (p.getRoots().size() > 1) {
+            throw new NotImplementedException("Nested plans with several roots are not supported.");
+        }
+        JobSpecification nestedJob = pc.compilePlan(p, outerPlanSchema, null);
+        ILogicalOperator topOpInSubplan = p.getRoots().get(0).getValue();
+        JobGenContext context = pc.getContext();
+        IOperatorSchema topOpInSubplanScm = context.getSchema(topOpInSubplan);
+        opSchema.addAllVariables(topOpInSubplanScm);
+
+        Map<OperatorDescriptorId, IOperatorDescriptor> opMap = nestedJob.getOperatorMap();
+        if (opMap.size() != 1) {
+            throw new AlgebricksException(
+                    "Attempting to construct a nested plan with "
+                            + opMap.size()
+                            + " operator descriptors. Currently, nested plans can only consist in linear pipelines of Asterix micro operators.");
+        }
+
+        for (OperatorDescriptorId oid : opMap.keySet()) {
+            IOperatorDescriptor opd = opMap.get(oid);
+            if (!(opd instanceof AlgebricksMetaOperatorDescriptor)) {
+                throw new AlgebricksException(
+                        "Can only generate Hyracks jobs for pipelinable Asterix nested plans, not for "
+                                + opd.getClass().getName());
+            }
+            AlgebricksMetaOperatorDescriptor amod = (AlgebricksMetaOperatorDescriptor) opd;
+
+            return amod.getPipeline();
+            // we suppose that the top operator in the subplan already does the
+            // projection for us
+        }
+
+        throw new IllegalStateException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
new file mode 100644
index 0000000..41711cb
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPreclusteredGroupByPOperator.java
@@ -0,0 +1,265 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.utils.ListSet;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty.PropertyType;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.PropertiesUtil;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
+
+public abstract class AbstractPreclusteredGroupByPOperator extends AbstractPhysicalOperator {
+
+    protected List<LogicalVariable> columnList;
+
+    public AbstractPreclusteredGroupByPOperator(List<LogicalVariable> columnList) {
+        this.columnList = columnList;
+    }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + columnList;
+    }
+
+    public List<LogicalVariable> getGbyColumns() {
+        return columnList;
+    }
+
+    public void setGbyColumns(List<LogicalVariable> gByColumns) {
+        this.columnList = gByColumns;
+    }
+
+    // Obs: We don't propagate properties corresponding to decors, since they
+    // are func. dep. on the group-by variables.
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        List<ILocalStructuralProperty> propsLocal = new LinkedList<ILocalStructuralProperty>();
+        GroupByOperator gby = (GroupByOperator) op;
+        ILogicalOperator op2 = gby.getInputs().get(0).getValue();
+        IPhysicalPropertiesVector childProp = op2.getDeliveredPhysicalProperties();
+        IPartitioningProperty pp = childProp.getPartitioningProperty();
+        List<ILocalStructuralProperty> childLocals = childProp.getLocalProperties();
+        if (childLocals != null) {
+            for (ILocalStructuralProperty lsp : childLocals) {
+                boolean failed = false;
+                switch (lsp.getPropertyType()) {
+                    case LOCAL_GROUPING_PROPERTY: {
+                        LocalGroupingProperty lgp = (LocalGroupingProperty) lsp;
+                        Set<LogicalVariable> colSet = new ListSet<LogicalVariable>();
+                        for (LogicalVariable v : lgp.getColumnSet()) {
+                            LogicalVariable v2 = getLhsGbyVar(gby, v);
+                            if (v2 != null) {
+                                colSet.add(v2);
+                            } else {
+                                failed = true;
+                            }
+                        }
+                        if (!failed) {
+                            propsLocal.add(new LocalGroupingProperty(colSet));
+                        }
+                        break;
+                    }
+                    case LOCAL_ORDER_PROPERTY: {
+                        LocalOrderProperty lop = (LocalOrderProperty) lsp;
+                        List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+                        for (OrderColumn oc : lop.getOrderColumns()) {
+                            LogicalVariable v2 = getLhsGbyVar(gby, oc.getColumn());
+                            if (v2 != null) {
+                                orderColumns.add(new OrderColumn(v2, oc.getOrder()));
+                            } else {
+                                failed = true;
+                            }
+                        }
+                        if (!failed) {
+                            propsLocal.add(new LocalOrderProperty(orderColumns));
+                        }
+                        break;
+                    }
+                    default: {
+                        throw new IllegalStateException();
+                    }
+                }
+                if (failed) {
+                    break;
+                }
+            }
+        }
+        deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent) {
+        StructuralPropertiesVector[] pv = new StructuralPropertiesVector[1];
+        List<ILocalStructuralProperty> localProps = null;
+
+        localProps = new ArrayList<ILocalStructuralProperty>(1);
+        Set<LogicalVariable> gbvars = new ListSet<LogicalVariable>(columnList);
+        LocalGroupingProperty groupProp = new LocalGroupingProperty(gbvars, new ArrayList<LogicalVariable>(columnList));
+
+        GroupByOperator gby = (GroupByOperator) op;
+        boolean goon = true;
+        for (ILogicalPlan p : gby.getNestedPlans()) {
+            // try to propagate secondary order requirements from nested
+            // groupings
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                AbstractLogicalOperator op1 = (AbstractLogicalOperator) r.getValue();
+                if (op1.getOperatorTag() == LogicalOperatorTag.AGGREGATE) {
+                    AbstractLogicalOperator op2 = (AbstractLogicalOperator) op1.getInputs().get(0).getValue();
+                    IPhysicalOperator pop2 = op2.getPhysicalOperator();
+                    if (pop2 instanceof AbstractPreclusteredGroupByPOperator) {
+                        List<LogicalVariable> sndOrder = ((AbstractPreclusteredGroupByPOperator) pop2).getGbyColumns();
+                        groupProp.getColumnSet().addAll(sndOrder);
+                        groupProp.getPreferredOrderEnforcer().addAll(sndOrder);
+                        goon = false;
+                        break;
+                    }
+                }
+            }
+            if (!goon) {
+                break;
+            }
+        }
+
+        localProps.add(groupProp);
+
+        if (reqdByParent != null) {
+            // propagate parent requirements
+            List<ILocalStructuralProperty> lpPar = reqdByParent.getLocalProperties();
+            if (lpPar != null) {
+                boolean allOk = true;
+                List<ILocalStructuralProperty> props = new ArrayList<ILocalStructuralProperty>(lpPar.size());
+                for (ILocalStructuralProperty prop : lpPar) {
+                    if (prop.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {
+                        allOk = false;
+                        break;
+                    }
+                    LocalOrderProperty lop = (LocalOrderProperty) prop;
+                    List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+                    List<OrderColumn> ords = lop.getOrderColumns();
+                    for (OrderColumn ord : ords) {
+                        Pair<LogicalVariable, Mutable<ILogicalExpression>> p = getGbyPairByRhsVar(gby, ord.getColumn());
+                        if (p == null) {
+                            p = getDecorPairByRhsVar(gby, ord.getColumn());
+                            if (p == null) {
+                                allOk = false;
+                                break;
+                            }
+                        }
+                        ILogicalExpression e = p.second.getValue();
+                        if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                            throw new IllegalStateException(
+                                    "Right hand side of group-by assignment should have been normalized to a variable reference.");
+                        }
+                        LogicalVariable v = ((VariableReferenceExpression) e).getVariableReference();
+                        orderColumns.add(new OrderColumn(v, ord.getOrder()));
+                    }
+                    props.add(new LocalOrderProperty(orderColumns));
+                }
+                List<FunctionalDependency> fdList = new ArrayList<FunctionalDependency>();
+                for (Pair<LogicalVariable, Mutable<ILogicalExpression>> decorPair : gby.getDecorList()) {
+                    List<LogicalVariable> hd = gby.getGbyVarList();
+                    List<LogicalVariable> tl = new ArrayList<LogicalVariable>(1);
+                    tl.add(((VariableReferenceExpression) decorPair.second.getValue()).getVariableReference());
+                    fdList.add(new FunctionalDependency(hd, tl));
+                }
+                if (allOk
+                        && PropertiesUtil.matchLocalProperties(localProps, props,
+                                new HashMap<LogicalVariable, EquivalenceClass>(), fdList)) {
+                    localProps = props;
+                }
+            }
+        }
+
+        IPartitioningProperty pp = null;
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) op;
+        if (aop.getExecutionMode() == ExecutionMode.PARTITIONED) {
+            pp = new UnorderedPartitionedProperty(new ListSet<LogicalVariable>(columnList), null);
+        }
+        pv[0] = new StructuralPropertiesVector(pp, localProps);
+        return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+    }
+
+    private static Pair<LogicalVariable, Mutable<ILogicalExpression>> getGbyPairByRhsVar(GroupByOperator gby,
+            LogicalVariable var) {
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gby.getGroupByList()) {
+            if (ve.first == var) {
+                return ve;
+            }
+        }
+        return null;
+    }
+
+    private static Pair<LogicalVariable, Mutable<ILogicalExpression>> getDecorPairByRhsVar(GroupByOperator gby,
+            LogicalVariable var) {
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gby.getDecorList()) {
+            if (ve.first == var) {
+                return ve;
+            }
+        }
+        return null;
+    }
+
+    private static LogicalVariable getLhsGbyVar(GroupByOperator gby, LogicalVariable var) {
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> ve : gby.getGroupByList()) {
+            ILogicalExpression e = ve.second.getValue();
+            if (e.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
+                throw new IllegalStateException(
+                        "Right hand side of group by assignment should have been normalized to a variable reference.");
+            }
+            LogicalVariable v = ((VariableReferenceExpression) e).getVariableReference();
+            if (v == var) {
+                return ve.first;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPropagatePropertiesForUsedVariablesPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPropagatePropertiesForUsedVariablesPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPropagatePropertiesForUsedVariablesPOperator.java
new file mode 100644
index 0000000..c77222b
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractPropagatePropertiesForUsedVariablesPOperator.java
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+
+public abstract class AbstractPropagatePropertiesForUsedVariablesPOperator extends AbstractPhysicalOperator {
+
+    public void computeDeliveredPropertiesForUsedVariables(ILogicalOperator op, List<LogicalVariable> usedVariables) {
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        IPartitioningProperty pp = op2.getDeliveredPhysicalProperties().getPartitioningProperty();
+        List<ILocalStructuralProperty> downPropsLocal = op2.getDeliveredPhysicalProperties().getLocalProperties();
+        List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
+        for (ILocalStructuralProperty lsp : downPropsLocal) {
+            LinkedList<LogicalVariable> cols = new LinkedList<LogicalVariable>();
+            lsp.getColumns(cols);
+            ILocalStructuralProperty propagatedProp = lsp.retainVariables(usedVariables);
+            if (propagatedProp != null) {
+                propsLocal.add(propagatedProp);
+            }
+        }
+        deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
+    }
+}