You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/08/25 18:41:43 UTC

[30/51] [partial] incubator-asterixdb-hyracks git commit: Change folder structure for Java repackage

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
new file mode 100644
index 0000000..1d7f134
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -0,0 +1,763 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.exceptions.NotImplementedException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.EquivalenceClass;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.UnnestingFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractUnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalGroupingProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import edu.uci.ics.hyracks.algebricks.core.config.AlgebricksConfig;
+
+public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void, IOptimizationContext> {
+
+    @Override
+    public Void visitAggregateOperator(AggregateOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        ctx.putEquivalenceClassMap(op, new HashMap<LogicalVariable, EquivalenceClass>());
+        ctx.putFDList(op, new ArrayList<FunctionalDependency>());
+        return null;
+    }
+
+    @Override
+    public Void visitAssignOperator(AssignOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        ILogicalOperator inp1 = op.getInputs().get(0).getValue();
+        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrComputeEqClasses(inp1, ctx);
+        ctx.putEquivalenceClassMap(op, eqClasses);
+
+        // Propagates equivalence classes that from expressions.
+        // Note that an equivalence class can also contain expression members.
+        propagateEquivalenceFromExpressionsToVariables(eqClasses, op.getExpressions(), op.getVariables());
+
+        // Generates FDs.
+        List<LogicalVariable> used = new ArrayList<LogicalVariable>();
+        VariableUtilities.getUsedVariables(op, used);
+        List<FunctionalDependency> fds1 = getOrComputeFDs(inp1, ctx);
+        List<FunctionalDependency> eFds = new ArrayList<FunctionalDependency>(fds1.size());
+        for (FunctionalDependency fd : fds1) {
+            if (fd.getTail().containsAll(used)) {
+                List<LogicalVariable> hd = new ArrayList<LogicalVariable>(fd.getHead());
+                List<LogicalVariable> tl = new ArrayList<LogicalVariable>(fd.getTail());
+                tl.addAll(op.getVariables());
+                FunctionalDependency fd2 = new FunctionalDependency(hd, tl);
+                eFds.add(fd2);
+            } else {
+                eFds.add(fd);
+            }
+        }
+        ctx.putFDList(op, eFds);
+        return null;
+    }
+
+    @Override
+    public Void visitDataScanOperator(DataSourceScanOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        ILogicalOperator inp1 = op.getInputs().get(0).getValue();
+        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrCreateEqClasses(op, ctx);
+        Map<LogicalVariable, EquivalenceClass> propagatedEqClasses = getOrComputeEqClasses(inp1, ctx);
+        eqClasses.putAll(propagatedEqClasses);
+        ctx.putEquivalenceClassMap(op, eqClasses);
+        List<FunctionalDependency> fds = new ArrayList<FunctionalDependency>(getOrComputeFDs(inp1, ctx));
+        ctx.putFDList(op, fds);
+        op.getDataSource().computeFDs(op.getVariables(), fds);
+        return null;
+    }
+
+    @Override
+    public Void visitDistinctOperator(DistinctOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        ILogicalOperator op0 = op.getInputs().get(0).getValue();
+        List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
+        ctx.putFDList(op, functionalDependencies);
+        for (FunctionalDependency inherited : getOrComputeFDs(op0, ctx)) {
+            boolean isCoveredByDistinctByVars = true;
+            for (LogicalVariable v : inherited.getHead()) {
+                if (!op.isDistinctByVar(v)) {
+                    isCoveredByDistinctByVars = false;
+                }
+            }
+            if (isCoveredByDistinctByVars) {
+                List<LogicalVariable> newTail = new ArrayList<LogicalVariable>();
+                for (LogicalVariable v2 : inherited.getTail()) {
+                    if (op.isDistinctByVar(v2)) {
+                        newTail.add(v2);
+                    }
+                }
+                if (!newTail.isEmpty()) {
+                    List<LogicalVariable> newHead = new ArrayList<LogicalVariable>(inherited.getHead());
+                    FunctionalDependency newFd = new FunctionalDependency(newHead, newTail);
+                    functionalDependencies.add(newFd);
+                }
+            }
+        }
+        Set<LogicalVariable> gbySet = new HashSet<LogicalVariable>();
+        List<Mutable<ILogicalExpression>> expressions = op.getExpressions();
+        for (Mutable<ILogicalExpression> pRef : expressions) {
+            ILogicalExpression p = pRef.getValue();
+            if (p.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression v = (VariableReferenceExpression) p;
+                gbySet.add(v.getVariableReference());
+            }
+        }
+        LocalGroupingProperty lgp = new LocalGroupingProperty(gbySet);
+
+        Map<LogicalVariable, EquivalenceClass> equivalenceClasses = getOrComputeEqClasses(op0, ctx);
+        ctx.putEquivalenceClassMap(op, equivalenceClasses);
+
+        lgp.normalizeGroupingColumns(equivalenceClasses, functionalDependencies);
+        Set<LogicalVariable> normSet = lgp.getColumnSet();
+        List<Mutable<ILogicalExpression>> newDistinctByList = new ArrayList<Mutable<ILogicalExpression>>();
+        for (Mutable<ILogicalExpression> p2Ref : expressions) {
+            ILogicalExpression p2 = p2Ref.getValue();
+            if (p2.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression var2 = (VariableReferenceExpression) p2;
+                LogicalVariable v2 = var2.getVariableReference();
+                if (normSet.contains(v2)) {
+                    newDistinctByList.add(p2Ref);
+                }
+            } else {
+                newDistinctByList.add(p2Ref);
+            }
+        }
+        expressions.clear();
+        expressions.addAll(newDistinctByList);
+        return null;
+    }
+
+    @Override
+    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        ctx.putEquivalenceClassMap(op, new HashMap<LogicalVariable, EquivalenceClass>());
+        ctx.putFDList(op, new ArrayList<FunctionalDependency>());
+        return null;
+    }
+
+    @Override
+    public Void visitExchangeOperator(ExchangeOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitGroupByOperator(GroupByOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        Map<LogicalVariable, EquivalenceClass> equivalenceClasses = new HashMap<LogicalVariable, EquivalenceClass>();
+        List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
+        ctx.putEquivalenceClassMap(op, equivalenceClasses);
+        ctx.putFDList(op, functionalDependencies);
+
+        List<FunctionalDependency> inheritedFDs = new ArrayList<FunctionalDependency>();
+        for (ILogicalPlan p : op.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                ILogicalOperator op2 = r.getValue();
+                equivalenceClasses.putAll(getOrComputeEqClasses(op2, ctx));
+                inheritedFDs.addAll(getOrComputeFDs(op2, ctx));
+            }
+        }
+
+        ILogicalOperator op0 = op.getInputs().get(0).getValue();
+        inheritedFDs.addAll(getOrComputeFDs(op0, ctx));
+        Map<LogicalVariable, EquivalenceClass> inheritedEcs = getOrComputeEqClasses(op0, ctx);
+        for (FunctionalDependency inherited : inheritedFDs) {
+            boolean isCoveredByGbyOrDecorVars = true;
+            List<LogicalVariable> newHead = new ArrayList<LogicalVariable>(inherited.getHead().size());
+            for (LogicalVariable v : inherited.getHead()) {
+                LogicalVariable vnew = getNewGbyVar(op, v);
+                if (vnew == null) {
+                    vnew = getNewDecorVar(op, v);
+                    if (vnew == null) {
+                        isCoveredByGbyOrDecorVars = false;
+                    }
+                    break;
+                }
+                newHead.add(vnew);
+            }
+
+            if (isCoveredByGbyOrDecorVars) {
+                List<LogicalVariable> newTail = new ArrayList<LogicalVariable>();
+                for (LogicalVariable v2 : inherited.getTail()) {
+                    LogicalVariable v3 = getNewGbyVar(op, v2);
+                    if (v3 != null) {
+                        newTail.add(v3);
+                    }
+                }
+                if (!newTail.isEmpty()) {
+                    FunctionalDependency newFd = new FunctionalDependency(newHead, newTail);
+                    functionalDependencies.add(newFd);
+                }
+            }
+        }
+
+        List<LogicalVariable> premiseGby = new LinkedList<LogicalVariable>();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> gByList = op.getGroupByList();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+            premiseGby.add(p.first);
+        }
+
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> decorList = op.getDecorList();
+
+        LinkedList<LogicalVariable> conclDecor = new LinkedList<LogicalVariable>();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : decorList) {
+            conclDecor.add(GroupByOperator.getDecorVariable(p));
+        }
+        if (!conclDecor.isEmpty()) {
+            functionalDependencies.add(new FunctionalDependency(premiseGby, conclDecor));
+        }
+
+        Set<LogicalVariable> gbySet = new HashSet<LogicalVariable>();
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression v = (VariableReferenceExpression) expr;
+                gbySet.add(v.getVariableReference());
+            }
+        }
+        LocalGroupingProperty lgp = new LocalGroupingProperty(gbySet);
+        lgp.normalizeGroupingColumns(inheritedEcs, inheritedFDs);
+        Set<LogicalVariable> normSet = lgp.getColumnSet();
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> newGbyList = new ArrayList<Pair<LogicalVariable, Mutable<ILogicalExpression>>>();
+        boolean changed = false;
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : gByList) {
+            ILogicalExpression expr = p.second.getValue();
+            if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                VariableReferenceExpression varRef = (VariableReferenceExpression) expr;
+                LogicalVariable v2 = varRef.getVariableReference();
+                EquivalenceClass ec2 = inheritedEcs.get(v2);
+                LogicalVariable v3;
+                if (ec2 != null && !ec2.representativeIsConst()) {
+                    v3 = ec2.getVariableRepresentative();
+                } else {
+                    v3 = v2;
+                }
+                if (normSet.contains(v3)) {
+                    newGbyList.add(p);
+                } else {
+                    changed = true;
+                    decorList.add(p);
+                }
+            } else {
+                newGbyList.add(p);
+            }
+        }
+        if (changed) {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Group-by list changed from "
+                    + GroupByOperator.veListToString(gByList) + " to " + GroupByOperator.veListToString(newGbyList)
+                    + ".\n");
+        }
+        gByList.clear();
+        gByList.addAll(newGbyList);
+        return null;
+    }
+
+    @Override
+    public Void visitInnerJoinOperator(InnerJoinOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        Map<LogicalVariable, EquivalenceClass> equivalenceClasses = new HashMap<LogicalVariable, EquivalenceClass>();
+        List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
+        ctx.putEquivalenceClassMap(op, equivalenceClasses);
+        ctx.putFDList(op, functionalDependencies);
+        ILogicalOperator op0 = op.getInputs().get(0).getValue();
+        ILogicalOperator op1 = op.getInputs().get(1).getValue();
+        functionalDependencies.addAll(getOrComputeFDs(op0, ctx));
+        functionalDependencies.addAll(getOrComputeFDs(op1, ctx));
+        equivalenceClasses.putAll(getOrComputeEqClasses(op0, ctx));
+        equivalenceClasses.putAll(getOrComputeEqClasses(op1, ctx));
+        ILogicalExpression expr = op.getCondition().getValue();
+        expr.getConstraintsAndEquivClasses(functionalDependencies, equivalenceClasses);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        Map<LogicalVariable, EquivalenceClass> equivalenceClasses = new HashMap<LogicalVariable, EquivalenceClass>();
+        List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
+        ctx.putEquivalenceClassMap(op, equivalenceClasses);
+        ctx.putFDList(op, functionalDependencies);
+        ILogicalOperator opLeft = op.getInputs().get(0).getValue();
+        ILogicalOperator opRight = op.getInputs().get(1).getValue();
+        functionalDependencies.addAll(getOrComputeFDs(opLeft, ctx));
+        functionalDependencies.addAll(getOrComputeFDs(opRight, ctx));
+        equivalenceClasses.putAll(getOrComputeEqClasses(opLeft, ctx));
+        equivalenceClasses.putAll(getOrComputeEqClasses(opRight, ctx));
+
+        Collection<LogicalVariable> leftSideVars;
+        if (opLeft.getSchema() == null) {
+            leftSideVars = new LinkedList<LogicalVariable>();
+            VariableUtilities.getLiveVariables(opLeft, leftSideVars);
+            // actually, not all produced vars. are visible (due to projection)
+            // so using cached schema is better and faster
+        } else {
+            leftSideVars = opLeft.getSchema();
+        }
+        ILogicalExpression expr = op.getCondition().getValue();
+        expr.getConstraintsForOuterJoin(functionalDependencies, leftSideVars);
+        return null;
+    }
+
+    @Override
+    public Void visitLimitOperator(LimitOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        AbstractLogicalOperator op1 = (AbstractLogicalOperator) op.getDataSourceReference().getValue();
+        ILogicalOperator inp1 = op1.getInputs().get(0).getValue();
+        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrComputeEqClasses(inp1, ctx);
+        ctx.putEquivalenceClassMap(op, eqClasses);
+        List<FunctionalDependency> fds = new ArrayList<FunctionalDependency>(getOrComputeFDs(inp1, ctx));
+        if (op1.getOperatorTag() == LogicalOperatorTag.GROUP) {
+            GroupByOperator gby = (GroupByOperator) op1;
+            LinkedList<LogicalVariable> tail = new LinkedList<LogicalVariable>();
+            for (LogicalVariable v : gby.getGbyVarList()) {
+                tail.add(v);
+                // all values for gby vars. are the same
+            }
+            FunctionalDependency gbyfd = new FunctionalDependency(new LinkedList<LogicalVariable>(), tail);
+            fds.add(gbyfd);
+        }
+        ctx.putFDList(op, fds);
+        return null;
+    }
+
+    @Override
+    public Void visitOrderOperator(OrderOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Void visitProjectOperator(ProjectOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClassesForUsedVars(op, ctx, op.getVariables());
+        return null;
+    }
+
+    @Override
+    public Void visitReplicateOperator(ReplicateOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitRunningAggregateOperator(RunningAggregateOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        ctx.putEquivalenceClassMap(op, new HashMap<LogicalVariable, EquivalenceClass>());
+        ctx.putFDList(op, new ArrayList<FunctionalDependency>());
+        return null;
+    }
+
+    @Override
+    public Void visitScriptOperator(ScriptOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClassesForUsedVars(op, ctx, op.getInputVariables());
+        return null;
+    }
+
+    @Override
+    public Void visitSelectOperator(SelectOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        ILogicalOperator childOp = op.getInputs().get(0).getValue();
+        Map<LogicalVariable, EquivalenceClass> equivalenceClasses = getOrComputeEqClasses(childOp, ctx);
+        ctx.putEquivalenceClassMap(op, equivalenceClasses);
+
+        List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
+        ctx.putFDList(op, functionalDependencies);
+        functionalDependencies.addAll(getOrComputeFDs(childOp, ctx));
+        equivalenceClasses.putAll(getOrComputeEqClasses(childOp, ctx));
+        ILogicalExpression expr = op.getCondition().getValue();
+        expr.getConstraintsAndEquivClasses(functionalDependencies, equivalenceClasses);
+        return null;
+    }
+
+    @Override
+    public Void visitSubplanOperator(SubplanOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        Map<LogicalVariable, EquivalenceClass> equivalenceClasses = new HashMap<LogicalVariable, EquivalenceClass>();
+        List<FunctionalDependency> functionalDependencies = new ArrayList<FunctionalDependency>();
+        ctx.putEquivalenceClassMap(op, equivalenceClasses);
+        ctx.putFDList(op, functionalDependencies);
+        for (ILogicalPlan p : op.getNestedPlans()) {
+            for (Mutable<ILogicalOperator> r : p.getRoots()) {
+                ILogicalOperator op2 = r.getValue();
+                equivalenceClasses.putAll(getOrComputeEqClasses(op2, ctx));
+                functionalDependencies.addAll(getOrComputeFDs(op2, ctx));
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitUnionOperator(UnionAllOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestMapOperator(UnnestMapOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        fdsEqClassesForAbstractUnnestOperator(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestOperator(UnnestOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        fdsEqClassesForAbstractUnnestOperator(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteOperator(WriteOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        // propagateFDsAndEquivClasses(op, ctx);
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        // propagateFDsAndEquivClasses(op, ctx);
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteResultOperator(WriteResultOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        // propagateFDsAndEquivClasses(op, ctx);
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitInsertDeleteOperator(InsertDeleteOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitSinkOperator(SinkOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        setEmptyFDsEqClasses(op, ctx);
+        return null;
+    }
+
+    private void propagateFDsAndEquivClasses(ILogicalOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        ILogicalOperator inp1 = op.getInputs().get(0).getValue();
+        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrComputeEqClasses(inp1, ctx);
+        ctx.putEquivalenceClassMap(op, eqClasses);
+        List<FunctionalDependency> fds = getOrComputeFDs(inp1, ctx);
+        ctx.putFDList(op, fds);
+    }
+
+    private Map<LogicalVariable, EquivalenceClass> getOrComputeEqClasses(ILogicalOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        Map<LogicalVariable, EquivalenceClass> eqClasses = ctx.getEquivalenceClassMap(op);
+        if (eqClasses == null) {
+            op.accept(this, ctx);
+            eqClasses = ctx.getEquivalenceClassMap(op);
+        }
+        return eqClasses;
+    }
+
+    private Map<LogicalVariable, EquivalenceClass> getOrCreateEqClasses(ILogicalOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        Map<LogicalVariable, EquivalenceClass> eqClasses = ctx.getEquivalenceClassMap(op);
+        if (eqClasses == null) {
+            eqClasses = new HashMap<LogicalVariable, EquivalenceClass>();
+            ctx.putEquivalenceClassMap(op, eqClasses);
+        }
+        return eqClasses;
+    }
+
+    private List<FunctionalDependency> getOrComputeFDs(ILogicalOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        List<FunctionalDependency> fds = ctx.getFDList(op);
+        if (fds == null) {
+            op.accept(this, ctx);
+            fds = ctx.getFDList(op);
+        }
+        return fds;
+    }
+
+    /***
+     * Propagated equivalent classes from the child to the current operator, based
+     * on the used variables of the current operator.
+     * 
+     * @param op
+     *            , the current operator
+     * @param ctx
+     *            , the optimization context which keeps track of all equivalent classes.
+     * @param usedVariables
+     *            , used variables.
+     * @throws AlgebricksException
+     */
+    private void propagateFDsAndEquivClassesForUsedVars(ILogicalOperator op, IOptimizationContext ctx,
+            List<LogicalVariable> usedVariables) throws AlgebricksException {
+        ILogicalOperator op2 = op.getInputs().get(0).getValue();
+        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrCreateEqClasses(op, ctx);
+        List<FunctionalDependency> fds = new ArrayList<FunctionalDependency>();
+        ctx.putFDList(op, fds);
+
+        Map<LogicalVariable, EquivalenceClass> chldClasses = getOrComputeEqClasses(op2, ctx);
+
+        // Propagate equivalent classes that contain the used variables.
+        for (LogicalVariable v : usedVariables) {
+            EquivalenceClass ec = eqClasses.get(v);
+            if (ec == null) {
+                EquivalenceClass oc = chldClasses.get(v);
+                if (oc == null) {
+                    continue;
+                }
+                List<LogicalVariable> m = new LinkedList<LogicalVariable>();
+                for (LogicalVariable v2 : oc.getMembers()) {
+                    if (usedVariables.contains(v2)) {
+                        m.add(v2);
+                    }
+                }
+                EquivalenceClass nc;
+                if (oc.representativeIsConst()) {
+                    nc = new EquivalenceClass(m, oc.getConstRepresentative());
+                } else if (m.contains(oc.getVariableRepresentative())) {
+                    nc = new EquivalenceClass(m, oc.getVariableRepresentative());
+                } else {
+                    nc = new EquivalenceClass(m, v);
+                }
+                for (LogicalVariable v3 : m) {
+                    eqClasses.put(v3, nc);
+                }
+            }
+        }
+
+        // Propagates equivalent classes that contain expressions that use the used variables.
+        // Note that for the case variable $v is not in the used variables but it is
+        // equivalent to field-access($t, i) and $t is a used variable, the equivalent
+        // class should still be propagated (kept).
+        Set<LogicalVariable> usedVarSet = new HashSet<LogicalVariable>(usedVariables);
+        for (Entry<LogicalVariable, EquivalenceClass> entry : chldClasses.entrySet()) {
+            EquivalenceClass ec = entry.getValue();
+            for (ILogicalExpression expr : ec.getExpressionMembers()) {
+                Set<LogicalVariable> exprUsedVars = new HashSet<LogicalVariable>();
+                expr.getUsedVariables(exprUsedVars);
+                exprUsedVars.retainAll(usedVarSet);
+                // Check if the expression member uses a used variable.
+                if (!exprUsedVars.isEmpty()) {
+                    for (LogicalVariable v : ec.getMembers()) {
+                        eqClasses.put(v, ec);
+                        // If variable members contain a used variable, the representative
+                        // variable should be a used variable.
+                        if (usedVarSet.contains(v)) {
+                            ec.setVariableRepresentative(v);
+                        }
+                    }
+                }
+            }
+        }
+
+        List<FunctionalDependency> chldFds = getOrComputeFDs(op2, ctx);
+        for (FunctionalDependency fd : chldFds) {
+            if (!usedVariables.containsAll(fd.getHead())) {
+                continue;
+            }
+            List<LogicalVariable> tl = new LinkedList<LogicalVariable>();
+            for (LogicalVariable v : fd.getTail()) {
+                if (usedVariables.contains(v)) {
+                    tl.add(v);
+                }
+            }
+            if (!tl.isEmpty()) {
+                FunctionalDependency newFd = new FunctionalDependency(fd.getHead(), tl);
+                fds.add(newFd);
+            }
+        }
+    }
+
+    private void fdsEqClassesForAbstractUnnestOperator(AbstractUnnestOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        ILogicalOperator inp1 = op.getInputs().get(0).getValue();
+        Map<LogicalVariable, EquivalenceClass> eqClasses = getOrCreateEqClasses(op, ctx);
+        Map<LogicalVariable, EquivalenceClass> propagatedEqClasses = getOrComputeEqClasses(inp1, ctx);
+        /**
+         * The original eq classes of unnest-map are only for produced variables, therefore
+         * eqClasses and propagatedEqClasses do not have overlaps.
+         */
+        eqClasses.putAll(propagatedEqClasses);
+        ctx.putEquivalenceClassMap(op, eqClasses);
+        List<FunctionalDependency> fds = getOrComputeFDs(inp1, ctx);
+        ctx.putFDList(op, fds);
+
+        ILogicalExpression expr = op.getExpressionRef().getValue();
+        if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
+            AbstractFunctionCallExpression afe = (AbstractFunctionCallExpression) expr;
+            if (afe.getKind() == FunctionKind.UNNEST && ((UnnestingFunctionCallExpression) afe).returnsUniqueValues()) {
+                List<LogicalVariable> vars = new ArrayList<LogicalVariable>();
+                VariableUtilities.getLiveVariables(op, vars);
+                ArrayList<LogicalVariable> h = new ArrayList<LogicalVariable>();
+                h.addAll(op.getVariables());
+                FunctionalDependency fd = new FunctionalDependency(h, vars);
+                fds.add(fd);
+            }
+        }
+    }
+
+    public static void setEmptyFDsEqClasses(ILogicalOperator op, IOptimizationContext ctx) {
+        Map<LogicalVariable, EquivalenceClass> eqClasses = new HashMap<LogicalVariable, EquivalenceClass>();
+        ctx.putEquivalenceClassMap(op, eqClasses);
+        List<FunctionalDependency> fds = new ArrayList<FunctionalDependency>();
+        ctx.putFDList(op, fds);
+    }
+
+    private LogicalVariable getNewGbyVar(GroupByOperator g, LogicalVariable v) {
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : g.getGroupByList()) {
+            ILogicalExpression e = p.second.getValue();
+            if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                LogicalVariable v2 = ((VariableReferenceExpression) e).getVariableReference();
+                if (v2 == v) {
+                    return p.first;
+                }
+            }
+        }
+        return null;
+    }
+
+    private LogicalVariable getNewDecorVar(GroupByOperator g, LogicalVariable v) {
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p : g.getDecorList()) {
+            ILogicalExpression e = p.second.getValue();
+            if (e.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+                LogicalVariable v2 = ((VariableReferenceExpression) e).getVariableReference();
+                if (v2 == v) {
+                    return (p.first != null) ? p.first : v2;
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitExtensionOperator(ExtensionOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
+    public Void visitExternalDataLookupOperator(ExternalDataLookupOperator op, IOptimizationContext ctx)
+            throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    /**
+     * Propagate equivalences that carried in expressions to the variables that
+     * they are assigned to.
+     * 
+     * @param eqClasses
+     *            an equivalent class map
+     * @param assignExprs
+     *            expressions on the right-hand-side of assignments
+     * @param assignVars
+     *            variables on the left-hand-side of assignments
+     */
+    private void propagateEquivalenceFromExpressionsToVariables(Map<LogicalVariable, EquivalenceClass> eqClasses,
+            List<Mutable<ILogicalExpression>> assignExprs, List<LogicalVariable> assignVars) {
+        for (int assignVarIndex = 0; assignVarIndex < assignVars.size(); ++assignVarIndex) {
+            LogicalVariable var = assignVars.get(assignVarIndex);
+            ILogicalExpression expr = assignExprs.get(assignVarIndex).getValue();
+            for (Entry<LogicalVariable, EquivalenceClass> entry : eqClasses.entrySet()) {
+                EquivalenceClass eqc = entry.getValue();
+                // If the equivalence class contains the right-hand-side expression,
+                // the left-hand-side variable is added into the equivalence class.
+                if (eqc.contains(expr)) {
+                    eqc.addMember(var);
+                }
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
new file mode 100644
index 0000000..b9b111d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -0,0 +1,575 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
+import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
+import edu.uci.ics.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boolean, ILogicalOperator> {
+
+    private final Map<LogicalVariable, LogicalVariable> variableMapping = new HashMap<LogicalVariable, LogicalVariable>();
+
+    public IsomorphismOperatorVisitor() {
+    }
+
+    @Override
+    public Boolean visitAggregateOperator(AggregateOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.AGGREGATE)
+            return Boolean.FALSE;
+        AggregateOperator aggOpArg = (AggregateOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(
+                getPairList(op.getVariables(), op.getExpressions()),
+                getPairList(aggOpArg.getVariables(), aggOpArg.getExpressions()));
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitRunningAggregateOperator(RunningAggregateOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.RUNNINGAGGREGATE)
+            return Boolean.FALSE;
+        RunningAggregateOperator aggOpArg = (RunningAggregateOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(
+                getPairList(op.getVariables(), op.getExpressions()),
+                getPairList(aggOpArg.getVariables(), aggOpArg.getExpressions()));
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) copyAndSubstituteVar(op, arg);
+        if (aop.getOperatorTag() != LogicalOperatorTag.EMPTYTUPLESOURCE)
+            return Boolean.FALSE;
+        return Boolean.TRUE;
+    }
+
+    @Override
+    public Boolean visitExtensionOperator(ExtensionOperator op, ILogicalOperator arg) throws AlgebricksException {
+        ExtensionOperator aop = (ExtensionOperator) copyAndSubstituteVar(op, arg);
+        if (aop.getOperatorTag() != LogicalOperatorTag.EXTENSION_OPERATOR)
+            return Boolean.FALSE;
+        return Boolean.TRUE;
+    }
+
+    @Override
+    public Boolean visitGroupByOperator(GroupByOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        // require the same physical operator, otherwise delivers different data
+        // properties
+        if (aop.getOperatorTag() != LogicalOperatorTag.GROUP
+                || aop.getPhysicalOperator().getOperatorTag() != op.getPhysicalOperator().getOperatorTag())
+            return Boolean.FALSE;
+
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> keyLists = op.getGroupByList();
+        GroupByOperator gbyOpArg = (GroupByOperator) copyAndSubstituteVar(op, arg);
+        List<Pair<LogicalVariable, Mutable<ILogicalExpression>>> keyListsArg = gbyOpArg.getGroupByList();
+
+        List<Pair<LogicalVariable, ILogicalExpression>> listLeft = new ArrayList<Pair<LogicalVariable, ILogicalExpression>>();
+        List<Pair<LogicalVariable, ILogicalExpression>> listRight = new ArrayList<Pair<LogicalVariable, ILogicalExpression>>();
+
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : keyLists)
+            listLeft.add(new Pair<LogicalVariable, ILogicalExpression>(pair.first, pair.second.getValue()));
+        for (Pair<LogicalVariable, Mutable<ILogicalExpression>> pair : keyListsArg)
+            listRight.add(new Pair<LogicalVariable, ILogicalExpression>(pair.first, pair.second.getValue()));
+
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(listLeft, listRight);
+
+        if (!isomorphic)
+            return Boolean.FALSE;
+        int sizeOp = op.getNestedPlans().size();
+        int sizeArg = gbyOpArg.getNestedPlans().size();
+        if (sizeOp != sizeArg)
+            return Boolean.FALSE;
+
+        GroupByOperator argOp = (GroupByOperator) arg;
+        List<ILogicalPlan> plans = op.getNestedPlans();
+        List<ILogicalPlan> plansArg = argOp.getNestedPlans();
+        for (int i = 0; i < plans.size(); i++) {
+            List<Mutable<ILogicalOperator>> roots = plans.get(i).getRoots();
+            List<Mutable<ILogicalOperator>> rootsArg = plansArg.get(i).getRoots();
+            if (roots.size() != rootsArg.size())
+                return Boolean.FALSE;
+            for (int j = 0; j < roots.size(); j++) {
+                ILogicalOperator topOp1 = roots.get(j).getValue();
+                ILogicalOperator topOp2 = rootsArg.get(j).getValue();
+                isomorphic = IsomorphismUtilities.isOperatorIsomorphicPlanSegment(topOp1, topOp2);
+                if (!isomorphic)
+                    return Boolean.FALSE;
+            }
+        }
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitLimitOperator(LimitOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.LIMIT)
+            return Boolean.FALSE;
+        LimitOperator limitOpArg = (LimitOperator) copyAndSubstituteVar(op, arg);
+        if (op.getOffset() != limitOpArg.getOffset())
+            return Boolean.FALSE;
+        boolean isomorphic = op.getMaxObjects().getValue().equals(limitOpArg.getMaxObjects().getValue());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitInnerJoinOperator(InnerJoinOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.INNERJOIN)
+            return Boolean.FALSE;
+        InnerJoinOperator joinOpArg = (InnerJoinOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = op.getCondition().getValue().equals(joinOpArg.getCondition().getValue());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitLeftOuterJoinOperator(LeftOuterJoinOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN)
+            return Boolean.FALSE;
+        LeftOuterJoinOperator joinOpArg = (LeftOuterJoinOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = op.getCondition().getValue().equals(joinOpArg.getCondition().getValue());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitNestedTupleSourceOperator(NestedTupleSourceOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.NESTEDTUPLESOURCE)
+            return Boolean.FALSE;
+        return Boolean.TRUE;
+    }
+
+    @Override
+    public Boolean visitOrderOperator(OrderOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.ORDER)
+            return Boolean.FALSE;
+        OrderOperator orderOpArg = (OrderOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = compareIOrderAndExpressions(op.getOrderExpressions(), orderOpArg.getOrderExpressions());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitAssignOperator(AssignOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.ASSIGN)
+            return Boolean.FALSE;
+        AssignOperator assignOpArg = (AssignOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(
+                getPairList(op.getVariables(), op.getExpressions()),
+                getPairList(assignOpArg.getVariables(), assignOpArg.getExpressions()));
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitSelectOperator(SelectOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.SELECT)
+            return Boolean.FALSE;
+        SelectOperator selectOpArg = (SelectOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = op.getCondition().getValue().equals(selectOpArg.getCondition().getValue());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitProjectOperator(ProjectOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.PROJECT)
+            return Boolean.FALSE;
+        ProjectOperator projectOpArg = (ProjectOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getVariables(), projectOpArg.getVariables());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.PARTITIONINGSPLIT)
+            return Boolean.FALSE;
+        PartitioningSplitOperator partitionOpArg = (PartitioningSplitOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = compareExpressions(op.getExpressions(), partitionOpArg.getExpressions());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.REPLICATE)
+            return Boolean.FALSE;
+        return Boolean.TRUE;
+    }
+
+    @Override
+    public Boolean visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.MATERIALIZE)
+            return Boolean.FALSE;
+        return Boolean.TRUE;
+    }
+
+    @Override
+    public Boolean visitScriptOperator(ScriptOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.SCRIPT)
+            return Boolean.FALSE;
+        ScriptOperator scriptOpArg = (ScriptOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = op.getScriptDescription().equals(scriptOpArg.getScriptDescription());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitSubplanOperator(SubplanOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.SUBPLAN)
+            return Boolean.FALSE;
+        SubplanOperator subplanOpArg = (SubplanOperator) copyAndSubstituteVar(op, arg);
+        List<ILogicalPlan> plans = op.getNestedPlans();
+        List<ILogicalPlan> plansArg = subplanOpArg.getNestedPlans();
+        for (int i = 0; i < plans.size(); i++) {
+            List<Mutable<ILogicalOperator>> roots = plans.get(i).getRoots();
+            List<Mutable<ILogicalOperator>> rootsArg = plansArg.get(i).getRoots();
+            if (roots.size() == rootsArg.size())
+                return Boolean.FALSE;
+            for (int j = 0; j < roots.size(); j++) {
+                ILogicalOperator topOp1 = roots.get(j).getValue();
+                ILogicalOperator topOp2 = rootsArg.get(j).getValue();
+                boolean isomorphic = IsomorphismUtilities.isOperatorIsomorphicPlanSegment(topOp1, topOp2);
+                if (!isomorphic)
+                    return Boolean.FALSE;
+            }
+        }
+        return Boolean.TRUE;
+    }
+
+    @Override
+    public Boolean visitUnionOperator(UnionAllOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.UNIONALL)
+            return Boolean.FALSE;
+        UnionAllOperator unionOpArg = (UnionAllOperator) copyAndSubstituteVar(op, arg);
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> mapping = op.getVariableMappings();
+        List<Triple<LogicalVariable, LogicalVariable, LogicalVariable>> mappingArg = unionOpArg.getVariableMappings();
+        if (mapping.size() != mappingArg.size())
+            return Boolean.FALSE;
+        return VariableUtilities.varListEqualUnordered(mapping, mappingArg);
+    }
+
+    @Override
+    public Boolean visitUnnestOperator(UnnestOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.UNNEST)
+            return Boolean.FALSE;
+        UnnestOperator unnestOpArg = (UnnestOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getVariables(), unnestOpArg.getVariables())
+                && variableEqual(op.getPositionalVariable(), unnestOpArg.getPositionalVariable());
+        if (!isomorphic)
+            return Boolean.FALSE;
+        isomorphic = op.getExpressionRef().getValue().equals(unnestOpArg.getExpressionRef().getValue());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitUnnestMapOperator(UnnestMapOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.UNNEST_MAP)
+            return Boolean.FALSE;
+        UnnestMapOperator unnestOpArg = (UnnestMapOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getVariables(), unnestOpArg.getVariables());
+        if (!isomorphic)
+            return Boolean.FALSE;
+        isomorphic = op.getExpressionRef().getValue().equals(unnestOpArg.getExpressionRef().getValue());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitDataScanOperator(DataSourceScanOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.DATASOURCESCAN)
+            return Boolean.FALSE;
+        DataSourceScanOperator argScan = (DataSourceScanOperator) arg;
+        if (!argScan.getDataSource().toString().equals(op.getDataSource().toString()))
+            return Boolean.FALSE;
+        DataSourceScanOperator scanOpArg = (DataSourceScanOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getVariables(), scanOpArg.getVariables())
+                && op.getDataSource().toString().equals(scanOpArg.getDataSource().toString());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitDistinctOperator(DistinctOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.DISTINCT)
+            return Boolean.FALSE;
+        DistinctOperator distinctOpArg = (DistinctOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = compareExpressions(op.getExpressions(), distinctOpArg.getExpressions());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitExchangeOperator(ExchangeOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.EXCHANGE)
+            return Boolean.FALSE;
+        // require the same partition property
+        if (!(op.getPhysicalOperator().getOperatorTag() == aop.getPhysicalOperator().getOperatorTag()))
+            return Boolean.FALSE;
+        variableMapping.clear();
+        IsomorphismUtilities.mapVariablesTopDown(op, arg, variableMapping);
+        IPhysicalPropertiesVector properties = op.getPhysicalOperator().getDeliveredProperties();
+        IPhysicalPropertiesVector propertiesArg = aop.getPhysicalOperator().getDeliveredProperties();
+        if (properties == null && propertiesArg == null)
+            return Boolean.TRUE;
+        if (properties == null || propertiesArg == null)
+            return Boolean.FALSE;
+        IPartitioningProperty partProp = properties.getPartitioningProperty();
+        IPartitioningProperty partPropArg = propertiesArg.getPartitioningProperty();
+        if (!partProp.getPartitioningType().equals(partPropArg.getPartitioningType()))
+            return Boolean.FALSE;
+        List<LogicalVariable> columns = new ArrayList<LogicalVariable>();
+        partProp.getColumns(columns);
+        List<LogicalVariable> columnsArg = new ArrayList<LogicalVariable>();
+        partPropArg.getColumns(columnsArg);
+        if (columns.size() != columnsArg.size())
+            return Boolean.FALSE;
+        if (columns.size() == 0)
+            return Boolean.TRUE;
+        for (int i = 0; i < columnsArg.size(); i++) {
+            LogicalVariable rightVar = columnsArg.get(i);
+            LogicalVariable leftVar = variableMapping.get(rightVar);
+            if (leftVar != null)
+                columnsArg.set(i, leftVar);
+        }
+        return VariableUtilities.varListEqualUnordered(columns, columnsArg);
+    }
+
+    @Override
+    public Boolean visitWriteOperator(WriteOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.WRITE)
+            return Boolean.FALSE;
+        WriteOperator writeOpArg = (WriteOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), writeOpArg.getSchema());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitDistributeResultOperator(DistributeResultOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.DISTRIBUTE_RESULT)
+            return Boolean.FALSE;
+        DistributeResultOperator writeOpArg = (DistributeResultOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), writeOpArg.getSchema());
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitWriteResultOperator(WriteResultOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.WRITE_RESULT)
+            return Boolean.FALSE;
+        WriteResultOperator writeOpArg = (WriteResultOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), writeOpArg.getSchema());
+        if (!op.getDataSource().equals(writeOpArg.getDataSource()))
+            isomorphic = false;
+        if (!op.getPayloadExpression().equals(writeOpArg.getPayloadExpression()))
+            isomorphic = false;
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitInsertDeleteOperator(InsertDeleteOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.INSERT_DELETE)
+            return Boolean.FALSE;
+        InsertDeleteOperator insertOpArg = (InsertDeleteOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), insertOpArg.getSchema());
+        if (!op.getDataSource().equals(insertOpArg.getDataSource()))
+            isomorphic = false;
+        if (!op.getPayloadExpression().equals(insertOpArg.getPayloadExpression()))
+            isomorphic = false;
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitIndexInsertDeleteOperator(IndexInsertDeleteOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.INDEX_INSERT_DELETE)
+            return Boolean.FALSE;
+        IndexInsertDeleteOperator insertOpArg = (IndexInsertDeleteOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), insertOpArg.getSchema());
+        if (!op.getDataSourceIndex().equals(insertOpArg.getDataSourceIndex()))
+            isomorphic = false;
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitTokenizeOperator(TokenizeOperator op, ILogicalOperator arg) throws AlgebricksException {
+        AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
+        if (aop.getOperatorTag() != LogicalOperatorTag.TOKENIZE)
+            return Boolean.FALSE;
+        TokenizeOperator tokenizeOpArg = (TokenizeOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = VariableUtilities.varListEqualUnordered(op.getSchema(), tokenizeOpArg.getSchema());
+        if (!op.getDataSourceIndex().equals(tokenizeOpArg.getDataSourceIndex()))
+            isomorphic = false;
+        return isomorphic;
+    }
+
+    @Override
+    public Boolean visitSinkOperator(SinkOperator op, ILogicalOperator arg) throws AlgebricksException {
+        return true;
+    }
+
+    private Boolean compareExpressions(List<Mutable<ILogicalExpression>> opExprs,
+            List<Mutable<ILogicalExpression>> argExprs) {
+        if (opExprs.size() != argExprs.size())
+            return Boolean.FALSE;
+        for (int i = 0; i < opExprs.size(); i++) {
+            boolean isomorphic = opExprs.get(i).getValue().equals(argExprs.get(i).getValue());
+            if (!isomorphic)
+                return Boolean.FALSE;
+        }
+        return Boolean.TRUE;
+    }
+
+    @Override
+    public Boolean visitExternalDataLookupOperator(ExternalDataLookupOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        return Boolean.FALSE;
+    }
+
+    private Boolean compareIOrderAndExpressions(List<Pair<IOrder, Mutable<ILogicalExpression>>> opOrderExprs,
+            List<Pair<IOrder, Mutable<ILogicalExpression>>> argOrderExprs) {
+        if (opOrderExprs.size() != argOrderExprs.size())
+            return Boolean.FALSE;
+        for (int i = 0; i < opOrderExprs.size(); i++) {
+            boolean isomorphic = opOrderExprs.get(i).first.equals(argOrderExprs.get(i).first);
+            if (!isomorphic)
+                return Boolean.FALSE;
+            isomorphic = opOrderExprs.get(i).second.getValue().equals(argOrderExprs.get(i).second.getValue());
+            if (!isomorphic)
+                return Boolean.FALSE;
+        }
+        return Boolean.TRUE;
+    }
+
+    private ILogicalOperator copyAndSubstituteVar(ILogicalOperator op, ILogicalOperator argOp)
+            throws AlgebricksException {
+        ILogicalOperator newOp = OperatorManipulationUtil.deepCopy(argOp);
+        variableMapping.clear();
+        IsomorphismUtilities.mapVariablesTopDown(op, argOp, variableMapping);
+
+        List<LogicalVariable> liveVars = new ArrayList<LogicalVariable>();
+        if (argOp.getInputs().size() > 0)
+            for (int i = 0; i < argOp.getInputs().size(); i++)
+                VariableUtilities.getLiveVariables(argOp.getInputs().get(i).getValue(), liveVars);
+        List<LogicalVariable> producedVars = new ArrayList<LogicalVariable>();
+        VariableUtilities.getProducedVariables(argOp, producedVars);
+        List<LogicalVariable> producedVarsNew = new ArrayList<LogicalVariable>();
+        VariableUtilities.getProducedVariables(op, producedVarsNew);
+
+        if (producedVars.size() != producedVarsNew.size())
+            return newOp;
+        for (Entry<LogicalVariable, LogicalVariable> map : variableMapping.entrySet()) {
+            if (liveVars.contains(map.getKey())) {
+                VariableUtilities.substituteVariables(newOp, map.getKey(), map.getValue(), null);
+            }
+        }
+        for (int i = 0; i < producedVars.size(); i++)
+            VariableUtilities.substituteVariables(newOp, producedVars.get(i), producedVarsNew.get(i), null);
+        return newOp;
+    }
+
+    public List<Pair<LogicalVariable, ILogicalExpression>> getPairList(List<LogicalVariable> vars,
+            List<Mutable<ILogicalExpression>> exprs) throws AlgebricksException {
+        List<Pair<LogicalVariable, ILogicalExpression>> list = new ArrayList<Pair<LogicalVariable, ILogicalExpression>>();
+        if (vars.size() != exprs.size())
+            throw new AlgebricksException("variable list size does not equal to expression list size ");
+        for (int i = 0; i < vars.size(); i++) {
+            list.add(new Pair<LogicalVariable, ILogicalExpression>(vars.get(i), exprs.get(i).getValue()));
+        }
+        return list;
+    }
+
+    private static boolean variableEqual(LogicalVariable var, LogicalVariable varArg) {
+        if (var == null && varArg == null)
+            return true;
+        if (var.equals(varArg))
+            return true;
+        else
+            return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/9939b48e/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
new file mode 100644
index 0000000..d79853d
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismUtilities.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
+
+public class IsomorphismUtilities {
+
+    public static void mapVariablesTopDown(ILogicalOperator op, ILogicalOperator arg,
+            Map<LogicalVariable, LogicalVariable> variableMapping) throws AlgebricksException {
+        IsomorphismVariableMappingVisitor visitor = new IsomorphismVariableMappingVisitor(variableMapping);
+        op.accept(visitor, arg);
+    }
+
+    public static boolean isOperatorIsomorphic(ILogicalOperator op, ILogicalOperator arg) throws AlgebricksException {
+        IsomorphismOperatorVisitor visitor = new IsomorphismOperatorVisitor();
+        return op.accept(visitor, arg).booleanValue();
+    }
+
+    public static boolean isOperatorIsomorphicPlanSegment(ILogicalOperator op, ILogicalOperator arg)
+            throws AlgebricksException {
+        List<Mutable<ILogicalOperator>> inputs1 = op.getInputs();
+        List<Mutable<ILogicalOperator>> inputs2 = arg.getInputs();
+        if (inputs1.size() != inputs2.size())
+            return Boolean.FALSE;
+        for (int i = 0; i < inputs1.size(); i++) {
+            ILogicalOperator input1 = inputs1.get(i).getValue();
+            ILogicalOperator input2 = inputs2.get(i).getValue();
+            boolean isomorphic = isOperatorIsomorphicPlanSegment(input1, input2);
+            if (!isomorphic)
+                return Boolean.FALSE;
+        }
+        return IsomorphismUtilities.isOperatorIsomorphic(op, arg);
+    }
+
+    public static boolean isOperatorIsomorphicPlan(ILogicalPlan plan, ILogicalPlan arg) throws AlgebricksException {
+        if (plan.getRoots().size() != arg.getRoots().size()) {
+            return false;
+        }
+        for (int i = 0; i < plan.getRoots().size(); i++) {
+            ILogicalOperator topOp1 = plan.getRoots().get(i).getValue();
+            ILogicalOperator topOp2 = arg.getRoots().get(i).getValue();
+            if (!IsomorphismUtilities.isOperatorIsomorphicPlanSegment(topOp1, topOp2)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+}