You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:41:34 UTC

[21/51] [partial] incubator-asterixdb-hyracks git commit: Change folder structure for Java repackage

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
new file mode 100644
index 0000000..4cb111d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/ConnectorPolicyAssignmentPolicy.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.IConnectorPolicyAssignmentPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.PipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.api.dataflow.connectors.SendSideMaterializedPipeliningConnectorPolicy;
+import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
+
+public class ConnectorPolicyAssignmentPolicy implements IConnectorPolicyAssignmentPolicy {
+    private static final long serialVersionUID = 1L;
+    private IConnectorPolicy senderSideMaterializePolicy = new SendSideMaterializedPipeliningConnectorPolicy();
+    private IConnectorPolicy pipeliningPolicy = new PipeliningConnectorPolicy();
+
+    @Override
+    public IConnectorPolicy getConnectorPolicyAssignment(IConnectorDescriptor c, int nProducers, int nConsumers,
+            int[] fanouts) {
+        if (c instanceof MToNPartitioningMergingConnectorDescriptor) {
+            return senderSideMaterializePolicy;
+        } else {
+            return pipeliningPolicy;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
new file mode 100644
index 0000000..e1f897c
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobBuilder.java
@@ -0,0 +1,330 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator.ExecutionMode;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
+import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.ConnectorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
+import edu.uci.ics.hyracks.api.dataflow.OperatorDescriptorId;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class JobBuilder implements IHyracksJobBuilder {
+
+    private JobSpecification jobSpec;
+    private AlgebricksPartitionConstraint clusterLocations;
+
+    private Map<ILogicalOperator, ArrayList<ILogicalOperator>> outEdges = new HashMap<ILogicalOperator, ArrayList<ILogicalOperator>>();
+    private Map<ILogicalOperator, ArrayList<ILogicalOperator>> inEdges = new HashMap<ILogicalOperator, ArrayList<ILogicalOperator>>();
+    private Map<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>> connectors = new HashMap<ILogicalOperator, Pair<IConnectorDescriptor, TargetConstraint>>();
+
+    private Map<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>> microOps = new HashMap<ILogicalOperator, Pair<IPushRuntimeFactory, RecordDescriptor>>();
+    private Map<IPushRuntimeFactory, ILogicalOperator> revMicroOpMap = new HashMap<IPushRuntimeFactory, ILogicalOperator>();
+    private Map<ILogicalOperator, IOperatorDescriptor> hyracksOps = new HashMap<ILogicalOperator, IOperatorDescriptor>();
+    private Map<ILogicalOperator, AlgebricksPartitionConstraint> pcForMicroOps = new HashMap<ILogicalOperator, AlgebricksPartitionConstraint>();
+
+    private int aodCounter = 0;
+    private Map<ILogicalOperator, Integer> algebraicOpBelongingToMetaAsterixOp = new HashMap<ILogicalOperator, Integer>();
+    private Map<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>> metaAsterixOpSkeletons = new HashMap<Integer, List<Pair<IPushRuntimeFactory, RecordDescriptor>>>();
+    private Map<Integer, AlgebricksMetaOperatorDescriptor> metaAsterixOps = new HashMap<Integer, AlgebricksMetaOperatorDescriptor>();
+    private final Map<IOperatorDescriptor, AlgebricksPartitionConstraint> partitionConstraintMap = new HashMap<IOperatorDescriptor, AlgebricksPartitionConstraint>();
+
+    public JobBuilder(JobSpecification jobSpec, AlgebricksPartitionConstraint clusterLocations) {
+        this.jobSpec = jobSpec;
+        this.clusterLocations = clusterLocations;
+    }
+
+    @Override
+    public void contributeMicroOperator(ILogicalOperator op, IPushRuntimeFactory runtime, RecordDescriptor recDesc) {
+        contributeMicroOperator(op, runtime, recDesc, null);
+    }
+
+    @Override
+    public void contributeMicroOperator(ILogicalOperator op, IPushRuntimeFactory runtime, RecordDescriptor recDesc,
+            AlgebricksPartitionConstraint pc) {
+        microOps.put(op, new Pair<IPushRuntimeFactory, RecordDescriptor>(runtime, recDesc));
+        revMicroOpMap.put(runtime, op);
+        if (pc != null) {
+            pcForMicroOps.put(op, pc);
+        }
+        AbstractLogicalOperator logicalOp = (AbstractLogicalOperator) op;
+        if (logicalOp.getExecutionMode() == ExecutionMode.UNPARTITIONED && pc == null) {
+            AlgebricksPartitionConstraint apc = new AlgebricksCountPartitionConstraint(1);
+            pcForMicroOps.put(logicalOp, apc);
+        }
+    }
+
+    @Override
+    public void contributeConnector(ILogicalOperator exchgOp, IConnectorDescriptor conn) {
+        connectors.put(exchgOp, new Pair<IConnectorDescriptor, TargetConstraint>(conn, null));
+    }
+
+    @Override
+    public void contributeConnectorWithTargetConstraint(ILogicalOperator exchgOp, IConnectorDescriptor conn,
+            TargetConstraint numberOfTargetPartitions) {
+        connectors.put(exchgOp, new Pair<IConnectorDescriptor, TargetConstraint>(conn, numberOfTargetPartitions));
+    }
+
+    @Override
+    public void contributeGraphEdge(ILogicalOperator src, int srcOutputIndex, ILogicalOperator dest, int destInputIndex) {
+        ArrayList<ILogicalOperator> outputs = outEdges.get(src);
+        if (outputs == null) {
+            outputs = new ArrayList<ILogicalOperator>();
+            outEdges.put(src, outputs);
+        }
+        addAtPos(outputs, dest, srcOutputIndex);
+
+        ArrayList<ILogicalOperator> inp = inEdges.get(dest);
+        if (inp == null) {
+            inp = new ArrayList<ILogicalOperator>();
+            inEdges.put(dest, inp);
+        }
+        addAtPos(inp, src, destInputIndex);
+    }
+
+    @Override
+    public void contributeHyracksOperator(ILogicalOperator op, IOperatorDescriptor opDesc) {
+        hyracksOps.put(op, opDesc);
+    }
+
+    @Override
+    public void contributeAlgebricksPartitionConstraint(IOperatorDescriptor opDesc, AlgebricksPartitionConstraint apc) {
+        partitionConstraintMap.put(opDesc, apc);
+    }
+
+    @Override
+    public JobSpecification getJobSpec() {
+        return jobSpec;
+    }
+
+    @Override
+    public void buildSpec(List<ILogicalOperator> roots) throws AlgebricksException {
+        buildAsterixComponents();
+        Map<IConnectorDescriptor, TargetConstraint> tgtConstraints = setupConnectors();
+        for (ILogicalOperator r : roots) {
+            IOperatorDescriptor opDesc = findOpDescForAlgebraicOp(r);
+            jobSpec.addRoot(opDesc);
+        }
+        setAllPartitionConstraints(tgtConstraints);
+    }
+
+    private void setAllPartitionConstraints(Map<IConnectorDescriptor, TargetConstraint> tgtConstraints) {
+        List<OperatorDescriptorId> roots = jobSpec.getRoots();
+        setSpecifiedPartitionConstraints();
+        for (OperatorDescriptorId rootId : roots) {
+            setPartitionConstraintsDFS(rootId, tgtConstraints, null);
+        }
+    }
+
+    private void setSpecifiedPartitionConstraints() {
+        for (ILogicalOperator op : pcForMicroOps.keySet()) {
+            AlgebricksPartitionConstraint pc = pcForMicroOps.get(op);
+            Integer k = algebraicOpBelongingToMetaAsterixOp.get(op);
+            AlgebricksMetaOperatorDescriptor amod = metaAsterixOps.get(k);
+            partitionConstraintMap.put(amod, pc);
+        }
+        for (IOperatorDescriptor opDesc : partitionConstraintMap.keySet()) {
+            AlgebricksPartitionConstraint pc = partitionConstraintMap.get(opDesc);
+            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, pc);
+        }
+    }
+
+    private void setPartitionConstraintsDFS(OperatorDescriptorId opId,
+            Map<IConnectorDescriptor, TargetConstraint> tgtConstraints, IOperatorDescriptor parentOp) {
+        List<IConnectorDescriptor> opInputs = jobSpec.getOperatorInputMap().get(opId);
+        AlgebricksPartitionConstraint opConstraint = null;
+        IOperatorDescriptor opDesc = jobSpec.getOperatorMap().get(opId);
+        if (opInputs != null) {
+            for (IConnectorDescriptor conn : opInputs) {
+                ConnectorDescriptorId cid = conn.getConnectorId();
+                org.apache.commons.lang3.tuple.Pair<org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>, org.apache.commons.lang3.tuple.Pair<IOperatorDescriptor, Integer>> p = jobSpec
+                        .getConnectorOperatorMap().get(cid);
+                IOperatorDescriptor src = p.getLeft().getLeft();
+                // DFS
+                setPartitionConstraintsDFS(src.getOperatorId(), tgtConstraints, opDesc);
+
+                TargetConstraint constraint = tgtConstraints.get(conn);
+                if (constraint != null) {
+                    switch (constraint) {
+                        case ONE: {
+                            opConstraint = new AlgebricksCountPartitionConstraint(1);
+                            break;
+                        }
+                        case SAME_COUNT: {
+                            opConstraint = partitionConstraintMap.get(src);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        if (partitionConstraintMap.get(opDesc) == null) {
+            if (opConstraint == null) {
+                if (parentOp != null) {
+                    AlgebricksPartitionConstraint pc = partitionConstraintMap.get(parentOp);
+                    if (pc != null) {
+                        opConstraint = pc;
+                    } else if (opInputs == null || opInputs.size() == 0) {
+                        opConstraint = new AlgebricksCountPartitionConstraint(1);
+                    }
+                }
+                if (opConstraint == null) {
+                    opConstraint = clusterLocations;
+                }
+            }
+            partitionConstraintMap.put(opDesc, opConstraint);
+            AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, opDesc, opConstraint);
+        }
+    }
+
+    private Map<IConnectorDescriptor, TargetConstraint> setupConnectors() throws AlgebricksException {
+        Map<IConnectorDescriptor, TargetConstraint> tgtConstraints = new HashMap<IConnectorDescriptor, TargetConstraint>();
+        for (ILogicalOperator exchg : connectors.keySet()) {
+            ILogicalOperator inOp = inEdges.get(exchg).get(0);
+            ILogicalOperator outOp = outEdges.get(exchg).get(0);
+            IOperatorDescriptor inOpDesc = findOpDescForAlgebraicOp(inOp);
+            IOperatorDescriptor outOpDesc = findOpDescForAlgebraicOp(outOp);
+            Pair<IConnectorDescriptor, TargetConstraint> connPair = connectors.get(exchg);
+            IConnectorDescriptor conn = connPair.first;
+            int producerPort = outEdges.get(inOp).indexOf(exchg);
+            int consumerPort = inEdges.get(outOp).indexOf(exchg);
+            jobSpec.connect(conn, inOpDesc, producerPort, outOpDesc, consumerPort);
+            if (connPair.second != null) {
+                tgtConstraints.put(conn, connPair.second);
+            }
+        }
+        return tgtConstraints;
+    }
+
+    private IOperatorDescriptor findOpDescForAlgebraicOp(ILogicalOperator op) throws AlgebricksException {
+        IOperatorDescriptor hOpDesc = hyracksOps.get(op);
+        if (hOpDesc != null) {
+            return hOpDesc;
+        }
+        Integer metaOpKey = algebraicOpBelongingToMetaAsterixOp.get(op);
+        if (metaOpKey == null) {
+            throw new AlgebricksException("Could not generate operator descriptor for operator " + op);
+        }
+        return metaAsterixOps.get(metaOpKey);
+    }
+
+    private void buildAsterixComponents() {
+        for (ILogicalOperator aop : microOps.keySet()) {
+            addMicroOpToMetaRuntimeOp(aop);
+        }
+        for (Integer k : metaAsterixOpSkeletons.keySet()) {
+            List<Pair<IPushRuntimeFactory, RecordDescriptor>> opContents = metaAsterixOpSkeletons.get(k);
+            AlgebricksMetaOperatorDescriptor amod = buildMetaAsterixOpDesc(opContents);
+            metaAsterixOps.put(k, amod);
+        }
+    }
+
+    private AlgebricksMetaOperatorDescriptor buildMetaAsterixOpDesc(
+            List<Pair<IPushRuntimeFactory, RecordDescriptor>> opContents) {
+        // RecordDescriptor outputRecordDesc = null;
+        int n = opContents.size();
+        IPushRuntimeFactory[] runtimeFactories = new IPushRuntimeFactory[n];
+        RecordDescriptor[] internalRecordDescriptors = new RecordDescriptor[n];
+        int i = 0;
+        for (Pair<IPushRuntimeFactory, RecordDescriptor> p : opContents) {
+            runtimeFactories[i] = p.first;
+            internalRecordDescriptors[i] = p.second;
+            // if (i == n - 1) {
+            // outputRecordDesc = p.second;
+            // }
+            i++;
+        }
+        ILogicalOperator lastLogicalOp = revMicroOpMap.get(runtimeFactories[n - 1]);
+        ArrayList<ILogicalOperator> outOps = outEdges.get(lastLogicalOp);
+        int outArity = (outOps == null) ? 0 : outOps.size();
+        ILogicalOperator firstLogicalOp = revMicroOpMap.get(runtimeFactories[0]);
+        ArrayList<ILogicalOperator> inOps = inEdges.get(firstLogicalOp);
+        int inArity = (inOps == null) ? 0 : inOps.size();
+        // boolean isLeafOp = inEdges.get(firstLogicalOp) == null;
+        return new AlgebricksMetaOperatorDescriptor(jobSpec, inArity, outArity, runtimeFactories,
+                internalRecordDescriptors);
+    }
+
+    private void addMicroOpToMetaRuntimeOp(ILogicalOperator aop) {
+        Integer k = algebraicOpBelongingToMetaAsterixOp.get(aop);
+        if (k == null) {
+            k = createNewMetaOpInfo(aop);
+        }
+        ArrayList<ILogicalOperator> destList = outEdges.get(aop);
+        if (destList == null || destList.size() != 1) {
+            // for now, we only support linear plans inside meta-ops.
+            return;
+        }
+        ILogicalOperator dest = destList.get(0);
+        Integer j = algebraicOpBelongingToMetaAsterixOp.get(dest);
+        if (j == null && microOps.get(dest) != null) {
+            algebraicOpBelongingToMetaAsterixOp.put(dest, k);
+            List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent1 = metaAsterixOpSkeletons.get(k);
+            aodContent1.add(microOps.get(dest));
+        } else if (j != null && j.intValue() != k.intValue()) {
+            // merge the j component into the k component
+            List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent1 = metaAsterixOpSkeletons.get(k);
+            List<Pair<IPushRuntimeFactory, RecordDescriptor>> aodContent2 = metaAsterixOpSkeletons.get(j);
+            aodContent1.addAll(aodContent2);
+            metaAsterixOpSkeletons.remove(j);
+            for (ILogicalOperator m : algebraicOpBelongingToMetaAsterixOp.keySet()) {
+                Integer g = algebraicOpBelongingToMetaAsterixOp.get(m);
+                if (g.intValue() == j.intValue()) {
+                    algebraicOpBelongingToMetaAsterixOp.put(m, k);
+                }
+            }
+        }
+
+    }
+
+    private int createNewMetaOpInfo(ILogicalOperator aop) {
+        int n = aodCounter;
+        aodCounter++;
+        List<Pair<IPushRuntimeFactory, RecordDescriptor>> metaOpContents = new ArrayList<Pair<IPushRuntimeFactory, RecordDescriptor>>();
+        metaOpContents.add(microOps.get(aop));
+        metaAsterixOpSkeletons.put(n, metaOpContents);
+        algebraicOpBelongingToMetaAsterixOp.put(aop, n);
+        return n;
+    }
+
+    private <E> void addAtPos(ArrayList<E> a, E elem, int pos) {
+        int n = a.size();
+        if (n > pos) {
+            a.set(pos, elem);
+        } else {
+            for (int k = n; k < pos; k++) {
+                a.add(null);
+            }
+            a.add(elem);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
new file mode 100644
index 0000000..a181304
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryBooleanInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
+
+public class JobGenContext {
+    private final IOperatorSchema outerFlowSchema;
+    private final Map<ILogicalOperator, IOperatorSchema> schemaMap = new HashMap<ILogicalOperator, IOperatorSchema>();
+    private final ISerializerDeserializerProvider serializerDeserializerProvider;
+    private final IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider;
+    private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider;
+    private final IBinaryComparatorFactoryProvider comparatorFactoryProvider;
+    private final IPrinterFactoryProvider printerFactoryProvider;
+    private final ITypeTraitProvider typeTraitProvider;
+    private final IMetadataProvider<?, ?> metadataProvider;
+    private final INullWriterFactory nullWriterFactory;
+    private final INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider;
+    private final Object appContext;
+    private final IBinaryBooleanInspectorFactory booleanInspectorFactory;
+    private final IBinaryIntegerInspectorFactory integerInspectorFactory;
+    private final IExpressionRuntimeProvider expressionRuntimeProvider;
+    private final IExpressionTypeComputer expressionTypeComputer;
+    private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
+    private final IPartialAggregationTypeComputer partialAggregationTypeComputer;
+    private final IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider;
+    private final int frameSize;
+    private AlgebricksPartitionConstraint clusterLocations;
+    private int varCounter;
+    private final ITypingContext typingContext;
+
+    public JobGenContext(IOperatorSchema outerFlowSchema, IMetadataProvider<?, ?> metadataProvider, Object appContext,
+            ISerializerDeserializerProvider serializerDeserializerProvider,
+            IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider,
+            IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider,
+            IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider,
+            IBinaryBooleanInspectorFactory booleanInspectorFactory,
+            IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider,
+            INullWriterFactory nullWriterFactory,
+            INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider,
+            IExpressionRuntimeProvider expressionRuntimeProvider, IExpressionTypeComputer expressionTypeComputer,
+            INullableTypeComputer nullableTypeComputer, ITypingContext typingContext,
+            IExpressionEvalSizeComputer expressionEvalSizeComputer,
+            IPartialAggregationTypeComputer partialAggregationTypeComputer,
+            IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider, int frameSize,
+            AlgebricksPartitionConstraint clusterLocations) {
+        this.outerFlowSchema = outerFlowSchema;
+        this.metadataProvider = metadataProvider;
+        this.appContext = appContext;
+        this.serializerDeserializerProvider = serializerDeserializerProvider;
+        this.hashFunctionFactoryProvider = hashFunctionFactoryProvider;
+        this.hashFunctionFamilyProvider = hashFunctionFamilyProvider;
+        this.comparatorFactoryProvider = comparatorFactoryProvider;
+        this.typeTraitProvider = typeTraitProvider;
+        this.booleanInspectorFactory = booleanInspectorFactory;
+        this.integerInspectorFactory = integerInspectorFactory;
+        this.printerFactoryProvider = printerFactoryProvider;
+        this.clusterLocations = clusterLocations;
+        this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider;
+        this.nullWriterFactory = nullWriterFactory;
+        this.expressionRuntimeProvider = expressionRuntimeProvider;
+        this.expressionTypeComputer = expressionTypeComputer;
+        this.typingContext = typingContext;
+        this.expressionEvalSizeComputer = expressionEvalSizeComputer;
+        this.partialAggregationTypeComputer = partialAggregationTypeComputer;
+        this.predEvaluatorFactoryProvider = predEvaluatorFactoryProvider;
+        this.frameSize = frameSize;
+        this.varCounter = 0;
+    }
+
+    public IOperatorSchema getOuterFlowSchema() {
+        return outerFlowSchema;
+    }
+
+    public AlgebricksPartitionConstraint getClusterLocations() {
+        return clusterLocations;
+    }
+
+    public IMetadataProvider<?, ?> getMetadataProvider() {
+        return metadataProvider;
+    }
+
+    public Object getAppContext() {
+        return appContext;
+    }
+
+    public ISerializerDeserializerProvider getSerializerDeserializerProvider() {
+        return serializerDeserializerProvider;
+    }
+
+    public IBinaryHashFunctionFactoryProvider getBinaryHashFunctionFactoryProvider() {
+        return hashFunctionFactoryProvider;
+    }
+
+    public IBinaryHashFunctionFamilyProvider getBinaryHashFunctionFamilyProvider() {
+        return hashFunctionFamilyProvider;
+    }
+
+    public IBinaryComparatorFactoryProvider getBinaryComparatorFactoryProvider() {
+        return comparatorFactoryProvider;
+    }
+
+    public ITypeTraitProvider getTypeTraitProvider() {
+        return typeTraitProvider;
+    }
+
+    public IBinaryBooleanInspectorFactory getBinaryBooleanInspectorFactory() {
+        return booleanInspectorFactory;
+    }
+
+    public IBinaryIntegerInspectorFactory getBinaryIntegerInspectorFactory() {
+        return integerInspectorFactory;
+    }
+
+    public IPrinterFactoryProvider getPrinterFactoryProvider() {
+        return printerFactoryProvider;
+    }
+
+    public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider() {
+        return predEvaluatorFactoryProvider;
+    }
+
+    public IExpressionRuntimeProvider getExpressionRuntimeProvider() {
+        return expressionRuntimeProvider;
+    }
+
+    public IOperatorSchema getSchema(ILogicalOperator op) {
+        return schemaMap.get(op);
+    }
+
+    public void putSchema(ILogicalOperator op, IOperatorSchema schema) {
+        schemaMap.put(op, schema);
+    }
+
+    public LogicalVariable createNewVar() {
+        varCounter++;
+        LogicalVariable var = new LogicalVariable(-varCounter);
+        return var;
+    }
+
+    public Object getType(ILogicalExpression expr, IVariableTypeEnvironment env) throws AlgebricksException {
+        return expressionTypeComputer.getType(expr, typingContext.getMetadataProvider(), env);
+    }
+
+    public INullWriterFactory getNullWriterFactory() {
+        return nullWriterFactory;
+    }
+
+    public INormalizedKeyComputerFactoryProvider getNormalizedKeyComputerFactoryProvider() {
+        return normalizedKeyComputerFactoryProvider;
+    }
+
+    public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
+        return expressionEvalSizeComputer;
+    }
+
+    public int getFrameSize() {
+        return frameSize;
+    }
+
+    public IPartialAggregationTypeComputer getPartialAggregationTypeComputer() {
+        return partialAggregationTypeComputer;
+    }
+
+    public IVariableTypeEnvironment getTypeEnvironment(ILogicalOperator op) {
+        return typingContext.getOutputTypeEnvironment(op);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
new file mode 100644
index 0000000..2d195e4
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenHelper.java
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.logging.Logger;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider;
+import edu.uci.ics.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
+import edu.uci.ics.hyracks.algebricks.data.IPrinterFactoryProvider;
+import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
+import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import edu.uci.ics.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
+import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
+import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
+
+public final class JobGenHelper {
+
+    private static final Logger LOGGER = Logger.getLogger(JobGenHelper.class.getName());
+
+    @SuppressWarnings("rawtypes")
+    public static RecordDescriptor mkRecordDescriptor(IVariableTypeEnvironment env, IOperatorSchema opSchema,
+            JobGenContext context) throws AlgebricksException {
+        ISerializerDeserializer[] fields = new ISerializerDeserializer[opSchema.getSize()];
+        ITypeTraits[] typeTraits = new ITypeTraits[opSchema.getSize()];
+        ISerializerDeserializerProvider sdp = context.getSerializerDeserializerProvider();
+        ITypeTraitProvider ttp = context.getTypeTraitProvider();
+        int i = 0;
+        for (LogicalVariable var : opSchema) {
+            Object t = env.getVarType(var);
+            if (t == null) {
+                LOGGER.warning("No type for variable " + var);
+            }
+            fields[i] = sdp.getSerializerDeserializer(t);
+            typeTraits[i] = ttp.getTypeTrait(t);
+            i++;
+        }
+        return new RecordDescriptor(fields, typeTraits);
+    }
+
+    public static IPrinterFactory[] mkPrinterFactories(IOperatorSchema opSchema, IVariableTypeEnvironment env,
+            JobGenContext context, int[] printColumns) throws AlgebricksException {
+        IPrinterFactory[] pf = new IPrinterFactory[printColumns.length];
+        IPrinterFactoryProvider pff = context.getPrinterFactoryProvider();
+        for (int i = 0; i < pf.length; i++) {
+            LogicalVariable v = opSchema.getVariable(printColumns[i]);
+            Object t = env.getVarType(v);
+            pf[i] = pff.getPrinterFactory(t);
+        }
+        return pf;
+    }
+
+    public static int[] variablesToFieldIndexes(Collection<LogicalVariable> varLogical, IOperatorSchema opSchema) {
+        int[] tuplePos = new int[varLogical.size()];
+        int i = 0;
+        for (LogicalVariable var : varLogical) {
+            tuplePos[i] = opSchema.findVariable(var);
+            i++;
+        }
+        return tuplePos;
+    }
+
+    public static IBinaryHashFunctionFactory[] variablesToBinaryHashFunctionFactories(
+            Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+            throws AlgebricksException {
+        IBinaryHashFunctionFactory[] funFactories = new IBinaryHashFunctionFactory[varLogical.size()];
+        int i = 0;
+        IBinaryHashFunctionFactoryProvider bhffProvider = context.getBinaryHashFunctionFactoryProvider();
+        for (LogicalVariable var : varLogical) {
+            Object type = env.getVarType(var);
+            funFactories[i++] = bhffProvider.getBinaryHashFunctionFactory(type);
+        }
+        return funFactories;
+    }
+
+    public static IBinaryHashFunctionFamily[] variablesToBinaryHashFunctionFamilies(
+            Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+            throws AlgebricksException {
+        IBinaryHashFunctionFamily[] funFamilies = new IBinaryHashFunctionFamily[varLogical.size()];
+        int i = 0;
+        IBinaryHashFunctionFamilyProvider bhffProvider = context.getBinaryHashFunctionFamilyProvider();
+        for (LogicalVariable var : varLogical) {
+            Object type = env.getVarType(var);
+            funFamilies[i++] = bhffProvider.getBinaryHashFunctionFamily(type);
+        }
+        return funFamilies;
+    }
+
+    public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(
+            Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+            throws AlgebricksException {
+        IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[varLogical.size()];
+        IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
+        int i = 0;
+        for (LogicalVariable v : varLogical) {
+            Object type = env.getVarType(v);
+            compFactories[i++] = bcfProvider.getBinaryComparatorFactory(type, true);
+        }
+        return compFactories;
+    }
+
+    public static IBinaryComparatorFactory[] variablesToAscBinaryComparatorFactories(List<LogicalVariable> varLogical,
+            int start, int size, IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
+        IBinaryComparatorFactory[] compFactories = new IBinaryComparatorFactory[size];
+        IBinaryComparatorFactoryProvider bcfProvider = context.getBinaryComparatorFactoryProvider();
+        for (int i = 0; i < size; i++) {
+            Object type = env.getVarType(varLogical.get(start + i));
+            compFactories[i] = bcfProvider.getBinaryComparatorFactory(type, true);
+        }
+        return compFactories;
+    }
+
+    public static INormalizedKeyComputerFactory variablesToAscNormalizedKeyComputerFactory(
+            Collection<LogicalVariable> varLogical, IVariableTypeEnvironment env, JobGenContext context)
+            throws AlgebricksException {
+        INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+        if (nkcfProvider == null)
+            return null;
+        for (LogicalVariable v : varLogical) {
+            Object type = env.getVarType(v);
+            return nkcfProvider.getNormalizedKeyComputerFactory(type, true);
+        }
+        return null;
+    }
+
+    public static ITypeTraits[] variablesToTypeTraits(Collection<LogicalVariable> varLogical,
+            IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
+        ITypeTraits[] typeTraits = new ITypeTraits[varLogical.size()];
+        ITypeTraitProvider typeTraitProvider = context.getTypeTraitProvider();
+        int i = 0;
+        for (LogicalVariable v : varLogical) {
+            Object type = env.getVarType(v);
+            typeTraits[i++] = typeTraitProvider.getTypeTrait(type);
+        }
+        return typeTraits;
+    }
+
+    public static ITypeTraits[] variablesToTypeTraits(List<LogicalVariable> varLogical, int start, int size,
+            IVariableTypeEnvironment env, JobGenContext context) throws AlgebricksException {
+        ITypeTraits[] typeTraits = new ITypeTraits[size];
+        ITypeTraitProvider typeTraitProvider = context.getTypeTraitProvider();
+        for (int i = 0; i < size; i++) {
+            Object type = env.getVarType(varLogical.get(start + i));
+            typeTraits[i] = typeTraitProvider.getTypeTrait(type);
+        }
+        return typeTraits;
+    }
+
+    public static int[] projectAllVariables(IOperatorSchema opSchema) {
+        int[] projectionList = new int[opSchema.getSize()];
+        int k = 0;
+        for (LogicalVariable v : opSchema) {
+            projectionList[k++] = opSchema.findVariable(v);
+        }
+        return projectionList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java
new file mode 100644
index 0000000..dc1fb35
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/OperatorSchemaImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+
+public class OperatorSchemaImpl implements IOperatorSchema {
+    private final Map<LogicalVariable, Integer> varMap;
+
+    private final List<LogicalVariable> varList;
+
+    public OperatorSchemaImpl() {
+        varMap = new HashMap<LogicalVariable, Integer>();
+        varList = new ArrayList<LogicalVariable>();
+    }
+
+    @Override
+    public void addAllVariables(IOperatorSchema source) {
+        for (LogicalVariable v : source) {
+            varMap.put(v, varList.size());
+            varList.add(v);
+        }
+    }
+
+    @Override
+    public void addAllNewVariables(IOperatorSchema source) {
+        for (LogicalVariable v : source) {
+            if (varMap.get(v) == null) {
+                varMap.put(v, varList.size());
+                varList.add(v);
+            }
+        }
+    }
+
+    @Override
+    public int addVariable(LogicalVariable var) {
+        int idx = varList.size();
+        varMap.put(var, idx);
+        varList.add(var);
+        return idx;
+    }
+
+    @Override
+    public void clear() {
+        varMap.clear();
+        varList.clear();
+    }
+
+    @Override
+    public int findVariable(LogicalVariable var) {
+        Integer i = varMap.get(var);
+        if (i == null) {
+            return -1;
+        }
+        return i;
+    }
+
+    @Override
+    public int getSize() {
+        return varList.size();
+    }
+
+    @Override
+    public LogicalVariable getVariable(int index) {
+        return varList.get(index);
+    }
+
+    @Override
+    public Iterator<LogicalVariable> iterator() {
+        return varList.iterator();
+    }
+
+    @Override
+    public String toString() {
+        return varMap.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
new file mode 100644
index 0000000..3af57ad
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.jobgen.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.api.job.IJobletEventListenerFactory;
+import edu.uci.ics.hyracks.api.job.IOperatorDescriptorRegistry;
+import edu.uci.ics.hyracks.api.job.JobSpecification;
+
+public class PlanCompiler {
+    private JobGenContext context;
+    private Map<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> operatorVisitedToParents = new HashMap<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>>();
+
+    public PlanCompiler(JobGenContext context) {
+        this.context = context;
+    }
+
+    public JobGenContext getContext() {
+        return context;
+    }
+
+    public JobSpecification compilePlan(ILogicalPlan plan, IOperatorSchema outerPlanSchema,
+            IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException {
+        JobSpecification spec = new JobSpecification(context.getFrameSize());
+        if (jobEventListenerFactory != null) {
+            spec.setJobletEventListenerFactory(jobEventListenerFactory);
+        }
+        List<ILogicalOperator> rootOps = new ArrayList<ILogicalOperator>();
+        IHyracksJobBuilder builder = new JobBuilder(spec, context.getClusterLocations());
+        for (Mutable<ILogicalOperator> opRef : plan.getRoots()) {
+            compileOpRef(opRef, spec, builder, outerPlanSchema);
+            rootOps.add(opRef.getValue());
+        }
+        reviseEdges(builder);
+        operatorVisitedToParents.clear();
+        builder.buildSpec(rootOps);
+        spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
+        // Do not do activity cluster planning because it is slow on large clusters
+        spec.setUseConnectorPolicyForScheduling(false);
+        return spec;
+    }
+
+    private void compileOpRef(Mutable<ILogicalOperator> opRef, IOperatorDescriptorRegistry spec,
+            IHyracksJobBuilder builder, IOperatorSchema outerPlanSchema) throws AlgebricksException {
+        ILogicalOperator op = opRef.getValue();
+        int n = op.getInputs().size();
+        IOperatorSchema[] schemas = new IOperatorSchema[n];
+        int i = 0;
+        for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) {
+            List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opRef2);
+            if (parents == null) {
+                parents = new ArrayList<Mutable<ILogicalOperator>>();
+                operatorVisitedToParents.put(opRef2, parents);
+                parents.add(opRef);
+                compileOpRef(opRef2, spec, builder, outerPlanSchema);
+                schemas[i++] = context.getSchema(opRef2.getValue());
+            } else {
+                if (!parents.contains(opRef))
+                    parents.add(opRef);
+                schemas[i++] = context.getSchema(opRef2.getValue());
+                continue;
+            }
+        }
+
+        IOperatorSchema opSchema = new OperatorSchemaImpl();
+        context.putSchema(op, opSchema);
+        op.getVariablePropagationPolicy().propagateVariables(opSchema, schemas);
+        op.contributeRuntimeOperator(builder, context, opSchema, schemas, outerPlanSchema);
+    }
+
+    private void reviseEdges(IHyracksJobBuilder builder) {
+        /**
+         * revise the edges for the case of replicate operator
+         */
+        for (Entry<Mutable<ILogicalOperator>, List<Mutable<ILogicalOperator>>> entry : operatorVisitedToParents
+                .entrySet()) {
+            Mutable<ILogicalOperator> child = entry.getKey();
+            List<Mutable<ILogicalOperator>> parents = entry.getValue();
+            if (parents.size() > 1) {
+                if (child.getValue().getOperatorTag() == LogicalOperatorTag.REPLICATE) {
+                    ReplicateOperator rop = (ReplicateOperator) child.getValue();
+                    if (rop.isBlocker()) {
+                        // make the order of the graph edges consistent with the order of rop's outputs
+                        List<Mutable<ILogicalOperator>> outputs = rop.getOutputs();
+                        for (Mutable<ILogicalOperator> parent : parents) {
+                            builder.contributeGraphEdge(child.getValue(), outputs.indexOf(parent), parent.getValue(), 0);
+                        }
+                    } else {
+                        int i = 0;
+                        for (Mutable<ILogicalOperator> parent : parents) {
+                            builder.contributeGraphEdge(child.getValue(), i, parent.getValue(), 0);
+                            i++;
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
new file mode 100644
index 0000000..e7b469a
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import java.util.Collection;
+import java.util.logging.Level;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+
+public abstract class AbstractRuleController {
+
+    protected IOptimizationContext context;
+
+    public AbstractRuleController() {
+    }
+
+    public void setContext(IOptimizationContext context) {
+        this.context = context;
+    }
+
+    /**
+     * Each rewriting strategy may differ in the
+     * 
+     * @param root
+     * @param ruleClasses
+     * @return true iff one of the rules in the collection fired
+     */
+    public abstract boolean rewriteWithRuleCollection(Mutable<ILogicalOperator> root,
+            Collection<IAlgebraicRewriteRule> rules) throws AlgebricksException;
+
+    /**
+     * @param opRef
+     * @param rule
+     * @return true if any rewrite was fired, either on opRef or any operator
+     *         under it.
+     */
+    protected boolean rewriteOperatorRef(Mutable<ILogicalOperator> opRef, IAlgebraicRewriteRule rule)
+            throws AlgebricksException {
+        return rewriteOperatorRef(opRef, rule, true, false);
+    }
+
+    private String getPlanString(Mutable<ILogicalOperator> opRef) throws AlgebricksException {
+        if (AlgebricksConfig.ALGEBRICKS_LOGGER.isLoggable(Level.FINE)) {
+            StringBuilder sb = new StringBuilder();
+            LogicalOperatorPrettyPrintVisitor pvisitor = context.getPrettyPrintVisitor();
+            PlanPrettyPrinter.printOperator((AbstractLogicalOperator) opRef.getValue(), sb, pvisitor, 0);
+            return sb.toString();
+        }
+        return null;
+    }
+
+    private void printRuleApplication(IAlgebraicRewriteRule rule, String beforePlan, String afterPlan)
+            throws AlgebricksException {
+        if (AlgebricksConfig.ALGEBRICKS_LOGGER.isLoggable(Level.FINE)) {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Rule " + rule.getClass() + " fired.\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Before plan\n" + beforePlan + "\n");
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> After plan\n" + afterPlan + "\n");
+        }
+    }
+
+    protected boolean rewriteOperatorRef(Mutable<ILogicalOperator> opRef, IAlgebraicRewriteRule rule,
+            boolean enterNestedPlans, boolean fullDFS) throws AlgebricksException {
+
+        String preBeforePlan = getPlanString(opRef);
+        if (rule.rewritePre(opRef, context)) {
+            String preAfterPlan = getPlanString(opRef);
+            printRuleApplication(rule, preBeforePlan, preAfterPlan);
+            return true;
+        }
+        boolean rewritten = false;
+        AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
+
+        for (Mutable<ILogicalOperator> inp : op.getInputs()) {
+            if (rewriteOperatorRef(inp, rule, enterNestedPlans, fullDFS)) {
+                rewritten = true;
+                if (!fullDFS) {
+                    break;
+                }
+            }
+        }
+
+        if (op.hasNestedPlans() && enterNestedPlans) {
+            AbstractOperatorWithNestedPlans o2 = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan p : o2.getNestedPlans()) {
+                for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                    if (rewriteOperatorRef(r, rule, enterNestedPlans, fullDFS)) {
+                        rewritten = true;
+                        if (!fullDFS) {
+                            break;
+                        }
+                    }
+                }
+                if (rewritten && !fullDFS) {
+                    break;
+                }
+            }
+        }
+
+        String postBeforePlan = getPlanString(opRef);
+        if (rule.rewritePost(opRef, context)) {
+            String postAfterPlan = getPlanString(opRef);
+            printRuleApplication(rule, postBeforePlan, postAfterPlan);
+            return true;
+        }
+
+        return rewritten;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
new file mode 100644
index 0000000..361dad3
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -0,0 +1,299 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableEvalSizeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.LogicalOperatorPrettyPrintVisitor;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
+
+public class AlgebricksOptimizationContext implements IOptimizationContext {
+
+    private int varCounter;
+    private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
+    private final IMergeAggregationExpressionFactory mergeAggregationExpressionFactory;
+    private final PhysicalOptimizationConfig physicalOptimizationConfig;
+    private final IVariableEvalSizeEnvironment varEvalSizeEnv = new IVariableEvalSizeEnvironment() {
+
+        Map<LogicalVariable, Integer> varSizeMap = new HashMap<LogicalVariable, Integer>();
+
+        @Override
+        public void setVariableEvalSize(LogicalVariable var, int size) {
+            varSizeMap.put(var, size);
+        }
+
+        @Override
+        public int getVariableEvalSize(LogicalVariable var) {
+            return varSizeMap.get(var);
+        }
+    };
+
+    private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<ILogicalOperator, IVariableTypeEnvironment>();
+
+    private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<ILogicalOperator, HashSet<ILogicalOperator>>();
+    private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<IAlgebraicRewriteRule, HashSet<ILogicalOperator>>();
+    private Map<LogicalVariable, FunctionalDependency> recordToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>();
+
+    @SuppressWarnings("unchecked")
+    private IMetadataProvider metadataProvider;
+    private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<LogicalVariable>();
+
+    protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<ILogicalOperator, List<FunctionalDependency>>();
+    protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>>();
+
+    protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>();
+    private final IExpressionTypeComputer expressionTypeComputer;
+    private final INullableTypeComputer nullableTypeComputer;
+    private final LogicalOperatorPrettyPrintVisitor prettyPrintVisitor;
+
+    public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
+            IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
+            IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
+            PhysicalOptimizationConfig physicalOptimizationConfig) {
+        this(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer,
+                nullableTypeComputer, physicalOptimizationConfig, new LogicalOperatorPrettyPrintVisitor());
+    }
+
+    public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer,
+            IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
+            IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
+            PhysicalOptimizationConfig physicalOptimizationConfig, LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
+        this.varCounter = varCounter;
+        this.expressionEvalSizeComputer = expressionEvalSizeComputer;
+        this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
+        this.expressionTypeComputer = expressionTypeComputer;
+        this.nullableTypeComputer = nullableTypeComputer;
+        this.physicalOptimizationConfig = physicalOptimizationConfig;
+        this.prettyPrintVisitor = prettyPrintVisitor;
+    }
+
+    public int getVarCounter() {
+        return varCounter;
+    }
+
+    public void setVarCounter(int varCounter) {
+        this.varCounter = varCounter;
+    }
+
+    public LogicalVariable newVar() {
+        varCounter++;
+        LogicalVariable var = new LogicalVariable(varCounter);
+        return var;
+    }
+
+    @SuppressWarnings("unchecked")
+    public IMetadataProvider getMetadataProvider() {
+        return metadataProvider;
+    }
+
+    public void setMetadataDeclarations(IMetadataProvider<?, ?> metadataProvider) {
+        this.metadataProvider = metadataProvider;
+    }
+
+    public boolean checkIfInDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
+        HashSet<ILogicalOperator> operators = dontApply.get(rule);
+        if (operators == null) {
+            return false;
+        } else {
+            return operators.contains(op);
+        }
+    }
+
+    public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
+        HashSet<ILogicalOperator> operators = dontApply.get(rule);
+        if (operators == null) {
+            HashSet<ILogicalOperator> os = new HashSet<ILogicalOperator>();
+            os.add(op);
+            dontApply.put(rule, os);
+        } else {
+            operators.add(op);
+        }
+
+    }
+
+    /*
+     * returns true if op1 and op2 have already been compared
+     */
+    @Override
+    public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2) {
+        HashSet<ILogicalOperator> ops = alreadyCompared.get(op1);
+        if (ops == null) {
+            HashSet<ILogicalOperator> newEntry = new HashSet<ILogicalOperator>();
+            newEntry.add(op2);
+            alreadyCompared.put(op1, newEntry);
+            return false;
+        } else {
+            if (ops.contains(op2)) {
+                return true;
+            } else {
+                ops.add(op2);
+                return false;
+            }
+        }
+    }
+    
+    @Override
+    public void removeFromAlreadyCompared(ILogicalOperator op1) {
+        alreadyCompared.remove(op1);
+    }
+
+    public void addNotToBeInlinedVar(LogicalVariable var) {
+        notToBeInlinedVars.add(var);
+    }
+
+    public boolean shouldNotBeInlined(LogicalVariable var) {
+        return notToBeInlinedVars.contains(var);
+    }
+
+    public void addPrimaryKey(FunctionalDependency pk) {
+        assert (pk.getTail().size() == 1);
+        LogicalVariable recordVar = pk.getTail().get(0);
+        recordToPrimaryKey.put(recordVar, pk);
+    }
+
+    public List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar) {
+        FunctionalDependency fd = recordToPrimaryKey.get(recordVar);
+        if (fd == null) {
+            return null;
+        }
+        return fd.getHead();
+    }
+
+    @Override
+    public Map<LogicalVariable, EquivalenceClass> getEquivalenceClassMap(ILogicalOperator op) {
+        return eqClassGlobalMap.get(op);
+    }
+
+    @Override
+    public List<FunctionalDependency> getFDList(ILogicalOperator op) {
+        return fdGlobalMap.get(op);
+    }
+
+    @Override
+    public void putEquivalenceClassMap(ILogicalOperator op, Map<LogicalVariable, EquivalenceClass> eqClassMap) {
+        this.eqClassGlobalMap.put(op, eqClassMap);
+    }
+
+    @Override
+    public void putFDList(ILogicalOperator op, List<FunctionalDependency> fdList) {
+        this.fdGlobalMap.put(op, fdList);
+    }
+
+    @Override
+    public ILogicalPropertiesVector getLogicalPropertiesVector(ILogicalOperator op) {
+        return logicalProps.get(op);
+    }
+
+    @Override
+    public void putLogicalPropertiesVector(ILogicalOperator op, ILogicalPropertiesVector v) {
+        logicalProps.put(op, v);
+    }
+
+    @Override
+    public IExpressionEvalSizeComputer getExpressionEvalSizeComputer() {
+        return expressionEvalSizeComputer;
+    }
+
+    @Override
+    public IVariableEvalSizeEnvironment getVariableEvalSizeEnvironment() {
+        return varEvalSizeEnv;
+    }
+
+    public IMergeAggregationExpressionFactory getMergeAggregationExpressionFactory() {
+        return mergeAggregationExpressionFactory;
+    }
+
+    public PhysicalOptimizationConfig getPhysicalOptimizationConfig() {
+        return physicalOptimizationConfig;
+    }
+
+    @Override
+    public IVariableTypeEnvironment getOutputTypeEnvironment(ILogicalOperator op) {
+        return typeEnvMap.get(op);
+    }
+
+    @Override
+    public void setOutputTypeEnvironment(ILogicalOperator op, IVariableTypeEnvironment env) {
+        typeEnvMap.put(op, env);
+    }
+
+    @Override
+    public IExpressionTypeComputer getExpressionTypeComputer() {
+        return expressionTypeComputer;
+    }
+
+    @Override
+    public INullableTypeComputer getNullableTypeComputer() {
+        return nullableTypeComputer;
+    }
+
+    @Override
+    public void invalidateTypeEnvironmentForOperator(ILogicalOperator op) {
+        typeEnvMap.put(op, null);
+    }
+
+    @Override
+    public void computeAndSetTypeEnvironmentForOperator(ILogicalOperator op) throws AlgebricksException {
+        setOutputTypeEnvironment(op, op.computeOutputTypeEnvironment(this));
+    }
+
+    @Override
+    public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars) {
+        for (Map.Entry<LogicalVariable, FunctionalDependency> me : recordToPrimaryKey.entrySet()) {
+            FunctionalDependency fd = me.getValue();
+            List<LogicalVariable> hd = new ArrayList<LogicalVariable>();
+            for (LogicalVariable v : fd.getHead()) {
+                LogicalVariable v2 = mappedVars.get(v);
+                if (v2 == null) {
+                    hd.add(v);
+                } else {
+                    hd.add(v2);
+                }
+            }
+            List<LogicalVariable> tl = new ArrayList<LogicalVariable>();
+            for (LogicalVariable v : fd.getTail()) {
+                LogicalVariable v2 = mappedVars.get(v);
+                if (v2 == null) {
+                    tl.add(v);
+                } else {
+                    tl.add(v2);
+                }
+            }
+            me.setValue(new FunctionalDependency(hd, tl));
+        }
+    }
+    
+    @Override
+    public LogicalOperatorPrettyPrintVisitor getPrettyPrintVisitor() {
+        return prettyPrintVisitor;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
new file mode 100644
index 0000000..17c6900
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractOperatorWithNestedPlans;
+import edu.uci.ics.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+
+public class HeuristicOptimizer {
+
+    public static PhysicalOperatorTag[] hyraxOperators = new PhysicalOperatorTag[] {
+            PhysicalOperatorTag.DATASOURCE_SCAN, PhysicalOperatorTag.BTREE_SEARCH,
+            PhysicalOperatorTag.EXTERNAL_GROUP_BY, PhysicalOperatorTag.HASH_GROUP_BY, PhysicalOperatorTag.HDFS_READER,
+            PhysicalOperatorTag.HYBRID_HASH_JOIN, PhysicalOperatorTag.IN_MEMORY_HASH_JOIN,
+            PhysicalOperatorTag.NESTED_LOOP, PhysicalOperatorTag.PRE_SORTED_DISTINCT_BY,
+            PhysicalOperatorTag.PRE_CLUSTERED_GROUP_BY, PhysicalOperatorTag.SPLIT, PhysicalOperatorTag.STABLE_SORT,
+            PhysicalOperatorTag.UNION_ALL };
+    public static PhysicalOperatorTag[] hyraxOperatorsBelowWhichJobGenIsDisabled = new PhysicalOperatorTag[] {};
+
+    public static boolean isHyraxOp(PhysicalOperatorTag opTag) {
+        for (PhysicalOperatorTag t : hyraxOperators) {
+            if (t == opTag) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private IOptimizationContext context;
+    private List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites;
+    private List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites;
+    private ILogicalPlan plan;
+
+    public HeuristicOptimizer(ILogicalPlan plan,
+            List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites,
+            List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites,
+            IOptimizationContext context) {
+        this.plan = plan;
+        this.context = context;
+        this.logicalRewrites = logicalRewrites;
+        this.physicalRewrites = physicalRewrites;
+    }
+
+    public void optimize() throws AlgebricksException {
+        if (plan == null) {
+            return;
+        }
+        if (AlgebricksConfig.DEBUG) {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting logical optimizations.\n");
+        }
+
+        StringBuilder sb = new StringBuilder();
+        PlanPrettyPrinter.printPlan(plan, sb, context.getPrettyPrintVisitor(), 0);
+        AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Logical Plan:\n" + sb.toString());
+        runOptimizationSets(plan, logicalRewrites);
+        computeSchemaBottomUpForPlan(plan);
+        runPhysicalOptimizations(plan, physicalRewrites);
+        StringBuilder sb2 = new StringBuilder();
+        PlanPrettyPrinter.printPlan(plan, sb2, context.getPrettyPrintVisitor(), 0);
+        AlgebricksConfig.ALGEBRICKS_LOGGER.info("Optimized Plan:\n" + sb2.toString());
+    }
+
+    private void runOptimizationSets(ILogicalPlan plan,
+            List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> optimSet) throws AlgebricksException {
+        for (Pair<AbstractRuleController, List<IAlgebraicRewriteRule>> ruleList : optimSet) {
+            for (Mutable<ILogicalOperator> r : plan.getRoots()) {
+                ruleList.first.setContext(context);
+                ruleList.first.rewriteWithRuleCollection(r, ruleList.second);
+            }
+        }
+    }
+
+    private static void computeSchemaBottomUpForPlan(ILogicalPlan p) throws AlgebricksException {
+        for (Mutable<ILogicalOperator> r : p.getRoots()) {
+            computeSchemaBottomUpForOp((AbstractLogicalOperator) r.getValue());
+        }
+    }
+
+    private static void computeSchemaBottomUpForOp(AbstractLogicalOperator op) throws AlgebricksException {
+        for (Mutable<ILogicalOperator> i : op.getInputs()) {
+            computeSchemaBottomUpForOp((AbstractLogicalOperator) i.getValue());
+        }
+        if (op.hasNestedPlans()) {
+            AbstractOperatorWithNestedPlans a = (AbstractOperatorWithNestedPlans) op;
+            for (ILogicalPlan p : a.getNestedPlans()) {
+                computeSchemaBottomUpForPlan(p);
+            }
+        }
+        op.recomputeSchema();
+    }
+
+    private void runPhysicalOptimizations(ILogicalPlan plan,
+            List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites)
+            throws AlgebricksException {
+        if (AlgebricksConfig.DEBUG) {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine("Starting physical optimizations.\n");
+        }
+        // PhysicalOptimizationsUtil.computeFDsAndEquivalenceClasses(plan);
+        runOptimizationSets(plan, physicalRewrites);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
new file mode 100644
index 0000000..6bf0b72
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IAlgebraicRewriteRule.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+
+public interface IAlgebraicRewriteRule {
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException;
+
+    public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
+            throws AlgebricksException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
new file mode 100644
index 0000000..70b7f64
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionEvalSizeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IMergeAggregationExpressionFactory;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.INullableTypeComputer;
+
+public interface IOptimizationContextFactory {
+    public IOptimizationContext createOptimizationContext(int varCounter,
+            IExpressionEvalSizeComputer expressionEvalSizeComputer,
+            IMergeAggregationExpressionFactory mergeAggregationExpressionFactory,
+            IExpressionTypeComputer expressionTypeComputer, INullableTypeComputer nullableTypeComputer,
+            PhysicalOptimizationConfig physicalOptimizationConfig);
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
new file mode 100644
index 0000000..47d2266
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/PhysicalOptimizationConfig.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.rewriter.base;
+
+import java.util.Properties;
+
+public class PhysicalOptimizationConfig {
+    private static final int MB = 1048576;
+    
+    private static final String FRAMESIZE = "FRAMESIZE";
+    private static final String MAX_FRAMES_EXTERNAL_SORT = "MAX_FRAMES_EXTERNAL_SORT";
+    private static final String MAX_FRAMES_EXTERNAL_GROUP_BY = "MAX_FRAMES_EXTERNAL_GROUP_BY";
+    private static final String MAX_FRAMES_LEFT_INPUT_HYBRID_HASH = "MAX_FRAMES_LEFT_INPUT_HYBRID_HASH";
+    private static final String MAX_FRAMES_HYBRID_HASH = "MAX_FRAMES_HYBRID_HASH";
+    private static final String FUDGE_FACTOR = "FUDGE_FACTOR";
+    private static final String MAX_RECORDS_PER_FRAME = "MAX_RECORDS_PER_FRAME";
+    
+    private static final String DEFAULT_HASH_GROUP_TABLE_SIZE = "DEFAULT_HASH_GROUP_TABLE_SIZE";
+    private static final String DEFAULT_EXTERNAL_GROUP_TABLE_SIZE = "DEFAULT_EXTERNAL_GROUP_TABLE_SIZE";
+    private static final String DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE = "DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE";
+
+    private Properties properties = new Properties();
+
+    public PhysicalOptimizationConfig() {
+        int frameSize = 32768;
+        setInt(FRAMESIZE, frameSize);
+        setInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 32 * MB) / frameSize));
+        setInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 32 * MB) / frameSize));
+
+        // use http://www.rsok.com/~jrm/printprimes.html to find prime numbers
+        setInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);
+        setInt(DEFAULT_EXTERNAL_GROUP_TABLE_SIZE, 10485767);
+        setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, 10485767);
+    }
+
+    public int getFrameSize() {
+        return getInt(FRAMESIZE, 32768);
+    }
+
+    public void setFrameSize(int frameSize) {
+        setInt(FRAMESIZE, frameSize);
+    }
+    
+    public double getFudgeFactor() {
+        return getDouble(FUDGE_FACTOR, 1.3);
+    }
+
+    public void setFudgeFactor(double fudgeFactor) {
+        setDouble(FUDGE_FACTOR, fudgeFactor);
+    }
+    
+    public int getMaxRecordsPerFrame() {
+        return getInt(MAX_RECORDS_PER_FRAME, 512);
+    }
+
+    public void setMaxRecordsPerFrame(int maxRecords) {
+        setInt(MAX_RECORDS_PER_FRAME, maxRecords);
+    }
+
+    public int getMaxFramesLeftInputHybridHash() {
+        int frameSize = getFrameSize();
+        return getInt(MAX_FRAMES_LEFT_INPUT_HYBRID_HASH, (int) (140L * 1024 * MB / frameSize));
+    }
+
+    public void setMaxFramesLeftInputHybridHash(int frameLimit) {
+        setInt(MAX_FRAMES_LEFT_INPUT_HYBRID_HASH, frameLimit);
+    }
+    
+    public int getMaxFramesHybridHash() {
+        int frameSize = getFrameSize();
+        return getInt(MAX_FRAMES_HYBRID_HASH, (int) (64L * MB / frameSize));
+    }
+
+    public void setMaxFramesHybridHash(int frameLimit) {
+        setInt(MAX_FRAMES_HYBRID_HASH, frameLimit);
+    }
+
+    public int getMaxFramesExternalGroupBy() {
+        int frameSize = getFrameSize();
+        return getInt(MAX_FRAMES_EXTERNAL_GROUP_BY, (int) (((long) 256 * MB) / frameSize));
+    }
+
+    public void setMaxFramesExternalGroupBy(int frameLimit) {
+        setInt(MAX_FRAMES_EXTERNAL_GROUP_BY, frameLimit);
+    }
+    
+    public int getMaxFramesExternalSort() {
+        int frameSize = getFrameSize();
+        return getInt(MAX_FRAMES_EXTERNAL_SORT, (int) (((long) 32 * MB) / frameSize));
+    }
+
+    public void setMaxFramesExternalSort(int frameLimit) {
+        setInt(MAX_FRAMES_EXTERNAL_SORT, frameLimit);
+    }
+
+    public int getHashGroupByTableSize() {
+        return getInt(DEFAULT_HASH_GROUP_TABLE_SIZE, 10485767);
+    }
+
+    public void setHashGroupByTableSize(int tableSize) {
+        setInt(DEFAULT_HASH_GROUP_TABLE_SIZE, tableSize);
+    }
+
+    public int getExternalGroupByTableSize() {
+        return getInt(DEFAULT_EXTERNAL_GROUP_TABLE_SIZE, 10485767);
+    }
+
+    public void setExternalGroupByTableSize(int tableSize) {
+        setInt(DEFAULT_EXTERNAL_GROUP_TABLE_SIZE, tableSize);
+    }
+
+    public int getInMemHashJoinTableSize() {
+        return getInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, 10485767);
+    }
+
+    public void setInMemHashJoinTableSize(int tableSize) {
+        setInt(DEFAULT_IN_MEM_HASH_JOIN_TABLE_SIZE, tableSize);
+    }
+
+    private void setInt(String property, int value) {
+        properties.setProperty(property, Integer.toString(value));
+    }
+
+    private int getInt(String property, int defaultValue) {
+        String value = properties.getProperty(property);
+        if (value == null)
+            return defaultValue;
+        else
+            return Integer.parseInt(value);
+    }
+    
+    private void setDouble(String property, double value) {
+        properties.setProperty(property, Double.toString(value));
+    }
+
+    private double getDouble(String property, double defaultValue) {
+        String value = properties.getProperty(property);
+        if (value == null)
+            return defaultValue;
+        else
+            return Double.parseDouble(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java
new file mode 100644
index 0000000..3ef4316
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/Substitution.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.utils;
+
+public class Substitution<T> {
+    public T substituted;
+    public T substitutedWith;
+
+    public Substitution(T substituted, T substitutedWith) {
+        this.substituted = substituted;
+        this.substitutedWith = substitutedWith;
+    }
+}