You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2009/04/21 01:03:11 UTC

svn commit: r766907 - in /hadoop/pig/trunk: src/org/apache/pig/impl/plan/OperatorPlan.java test/org/apache/pig/test/TestBestFitCast.java test/org/apache/pig/test/TestOperatorPlan.java

Author: sms
Date: Mon Apr 20 23:03:10 2009
New Revision: 766907

URL: http://svn.apache.org/viewvc?rev=766907&view=rev
Log:
PIG-693: Proposed improvements to pig's optimizer

Modified:
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=766907&r1=766906&r2=766907&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Mon Apr 20 23:03:10 2009
@@ -29,6 +29,7 @@
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.pig.PigException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -416,7 +417,7 @@
      * @param newNode new node to insert.  This node must have already been
      * added to the plan.
      * @param before Node to insert this node before
-     * @throws PlanException if it encounters trouble disconecting or
+     * @throws PlanException if it encounters trouble disconnecting or
      * connecting nodes.
      */
     public void insertBetween(
@@ -436,20 +437,48 @@
 
     // replaces (src -> dst) entry in multiMap with (src -> replacement)
     private boolean replaceNode(E src, E replacement, E dst, MultiMap<E, E> multiMap) {
-        Collection c = multiMap.get(src);
-        if (c == null) return false;
-
-        ArrayList al = new ArrayList(c);
-        for(int i = 0; i < al.size(); ++i) {
-            E to = (E)al.get(i);
-            if(to.equals(dst)) {
-                al.set(i, replacement);
-                multiMap.removeKey(src);
-                multiMap.put(src, al);
+        if(multiMap == null) return false;
+        
+        if(src == null) return false;
+      
+        List<E> nodes = (ArrayList<E>)multiMap.get(src);
+        if (nodes == null) {
+            //we need to add replacement to the multimap as long as replacement != null
+            if(replacement == null) {
+                return false;
+            } else if (dst == null) {
+                ArrayList<E> replacementNodes = new ArrayList<E>();
+                replacementNodes.add(replacement);
+                multiMap.put(src, replacementNodes);
                 return true;
+            } else {
+                return false;
+            }
+        }
+        
+        if(dst == null) return false;
+        
+        boolean replaced = false;
+        ArrayList<E> replacementNodes = new ArrayList<E>();
+        for(int i = 0; i < nodes.size(); ++i) {
+            E to = nodes.get(i);
+            if(to.equals(dst)) {
+                replaced = true;
+                if(replacement != null) {
+                    replacementNodes.add(replacement);
+                }
+            } else {
+                replacementNodes.add(to);
+            }
+        }
+        
+        if(replaced) {
+            multiMap.removeKey(src);
+            if(replacementNodes.size() > 0) {
+                multiMap.put(src, replacementNodes);
             }
         }
-        return false;
+        return replaced;
     }
 
     /**
@@ -536,20 +565,7 @@
         if (pred != null && succ != null) connect(pred, succ);
     }
 
-    /**
-     * Remove a node in a way that connects the node's predecessor (if any)
-     * with the node's successors (if any).  This function handles the
-     * case where the node has *one* predecessor and one or more successors.
-     * It replaces the predecessor in same position as node was in
-     * each of the successors predecessor list(getPredecessors()), to 
-     * preserve input ordering 
-     * for eg, it is used to remove redundant project(*) from plan
-     * which will have only one predecessor,but can have multiple success
-     * @param node Node to be removed
-     * @throws PlanException if the node has more than one predecessor
-     */
-    public void removeAndReconnectMultiSucc(E node) throws PlanException {
-
+    private void reconnectSuccessors(E node, boolean successorRequired, boolean removeNode) throws PlanException {
         // Before:
         //    A (predecessor (only one) )
         //  / |
@@ -563,7 +579,7 @@
         //  X  C1 C2 C3 ...
         // the variable names are from above example
 
-    	E nodeB = node;
+        E nodeB = node;
         List<E> preds = getPredecessors(nodeB);
         //checking pre-requisite conditions
         if (preds == null || preds.size() != 1) {
@@ -583,57 +599,186 @@
         Collection<E> nodeC = mFromEdges.get(nodeB);
 
         //checking pre-requisite conditions
-        if (nodeC == null || nodeC.size() == 0) {
-            PlanException pe = new PlanException("Attempt to remove " +
-            " and reconnect for node with no successors.");
-            log.error(pe.getMessage());
-            throw pe;
-        }   
+        if(successorRequired) {
+            if (nodeC == null || nodeC.size() == 0) {
+                PlanException pe = new PlanException("Attempt to remove " +
+                " and reconnect for node with no successors.");
+                log.error(pe.getMessage());
+                throw pe;
+            }
+        }
 
 
         // replace B in A.succesors and add B.successors(ie C) to it
         replaceAndAddSucessors(nodeA, nodeB);
         
         // for all C(succs) , replace B(node) in predecessors, with A(pred)
-        for(E c: nodeC) {
-            Collection<E> sPreds = mToEdges.get(c);
-            ArrayList<E> newPreds = new ArrayList<E>(sPreds.size());
-            for(E p: sPreds){
-                if(p == nodeB){
-                    //replace
-                    newPreds.add(nodeA);
+        if(nodeC != null) {
+            for(E c: nodeC) {
+                Collection<E> sPreds = mToEdges.get(c);
+                ArrayList<E> newPreds = new ArrayList<E>(sPreds.size());
+                for(E p: sPreds){
+                    if(p == nodeB){
+                        //replace
+                        newPreds.add(nodeA);
+                    }
+                    else{
+                        newPreds.add(p);
+                    }
                 }
-                else{
-                    newPreds.add(p);
+                mToEdges.removeKey(c);
+                mToEdges.put(c,newPreds);
+                
+            }
+        }
+        
+        if(removeNode) {
+            remove(nodeB);
+        } else {
+            //make sure that the node does not have any dangling from and to edges
+            mFromEdges.removeKey(nodeB);
+            mToEdges.removeKey(nodeB);
+        }
+    }
+    
+    private void reconnectPredecessors(E node, boolean predecessorRequired, boolean removeNode) throws PlanException {
+        // Before:
+        // C1 C2  C3 ... (Predecessors)
+        //  \ |  /    \
+        // X  B(nodeB)  Y(some successor of a Cn)
+        //  \ |
+        //    A (successor (only one) )
+ 
+
+        // should become
+        // After:
+        //  X  C1 C2 C3 ...
+        //   \  \ | /  \
+        //        A     Y
+        // the variable names are from above example
+
+        E nodeB = node;
+        List<E> nodeBsuccessors = getSuccessors(nodeB);
+        //checking pre-requisite conditions
+        if (nodeBsuccessors == null || nodeBsuccessors.size() != 1) {
+            Integer size = null;
+            if(nodeBsuccessors != null)
+                size = nodeBsuccessors.size();
+
+            PlanException pe = new PlanException("Attempt to remove " +
+                    " and reconnect for node with  " + size +
+            " successors.");
+            log.error(pe.getMessage());
+            throw pe;
+        }
+
+        //A and C
+        E nodeA = nodeBsuccessors.get(0);
+        Collection<E> nodeC = mToEdges.get(nodeB);
+
+        //checking pre-requisite conditions
+        if(predecessorRequired) {
+            if (nodeC == null || nodeC.size() == 0) {
+                PlanException pe = new PlanException("Attempt to remove " +
+                " and reconnect for node with no predecessors.");
+                log.error(pe.getMessage());
+                throw pe;
+            }
+        }
+
+
+        // replace B in A.predecessors and add B.predecessors(ie C) to it
+        replaceAndAddPredecessors(nodeA, nodeB);
+        
+        // for all C(predecessors) , replace B(node) in successors, with A(successor)
+        if(nodeC != null) {
+            for(E c: nodeC) {
+                Collection<E> sPreds = mFromEdges.get(c);
+                ArrayList<E> newPreds = new ArrayList<E>(sPreds.size());
+                for(E p: sPreds){
+                    if(p == nodeB){
+                        //replace
+                        newPreds.add(nodeA);
+                    }
+                    else{
+                        newPreds.add(p);
+                    }
                 }
+                mFromEdges.removeKey(c);
+                mFromEdges.put(c,newPreds);
+                
             }
-            mToEdges.removeKey(c);
-            mToEdges.put(c,newPreds);
-            
         }
-        remove(nodeB); 
+        
+        if(removeNode) {
+            remove(nodeB);
+        } else {
+            //make sure that the node does not have any dangling from and to edges
+            mFromEdges.removeKey(nodeB);
+            mToEdges.removeKey(nodeB);
+        }
     }
     
-    //removes entry  for succ in list of successors of nd, and adds successors
-    // of succ in its place
-    // @param nd - parent node whose entry for succ needs to be replaced
-    // @param succ - see above
-    private void replaceAndAddSucessors(E nd, E succ) throws PlanException {
-       Collection<E> oldSuccs = mFromEdges.get(nd);
-       Collection<E> repSuccs = mFromEdges.get(succ);
-       ArrayList<E> newSuccs = new ArrayList<E>(oldSuccs.size() - 1 + repSuccs.size() );
-       for(E s: oldSuccs){
-           if(s == succ){
-               newSuccs.addAll(repSuccs);
+    // removes entry for successor in list of successors of node
+    // and adds successors of successor in its place
+    // @param noded - parent node whose entry for successor needs to be replaced
+    // @param successor - see above
+    private void replaceAndAddSucessors(E node, E successor) throws PlanException {
+       Collection<E> oldSuccessors = mFromEdges.get(node);
+       Collection<E> replacementSuccessors = mFromEdges.get(successor);
+       ArrayList<E> newSuccessors = new ArrayList<E>();
+       for(E s: oldSuccessors){
+           if(s == successor){
+               if(replacementSuccessors != null) {
+                   newSuccessors.addAll(replacementSuccessors);
+               }
+           }else{
+               newSuccessors.add(s);
+           }
+       }
+       mFromEdges.removeKey(node);
+       mFromEdges.put(node,newSuccessors);
+    }    
+
+    // removes entry  for predecessor in list of predecessors of node, 
+    // and adds predecessors of predecessor in its place
+    // @param node - parent node whose entry for predecessor needs to be replaced
+    // @param predecessor - see above
+    private void replaceAndAddPredecessors(E node, E predecessor) throws PlanException {
+       Collection<E> oldPredecessors = mToEdges.get(node);
+       Collection<E> replacementPredecessors = mToEdges.get(predecessor);
+       ArrayList<E> newPredecessors = new ArrayList<E>();
+       for(E p: oldPredecessors){
+           if(p == predecessor){
+               if(replacementPredecessors != null) {
+                   newPredecessors.addAll(replacementPredecessors);
+               }
            }else{
-               newSuccs.add(s);
+               newPredecessors.add(p);
            }
        }
-       mFromEdges.removeKey(nd);
-       mFromEdges.put(nd,newSuccs);
+       mToEdges.removeKey(node);
+       mToEdges.put(node,newPredecessors);
+    }
+    
+    /**
+     * Remove a node in a way that connects the node's predecessor (if any)
+     * with the node's successors (if any).  This function handles the
+     * case where the node has *one* predecessor and one or more successors.
+     * It replaces the predecessor in same position as node was in
+     * each of the successors predecessor list(getPredecessors()), to 
+     * preserve input ordering 
+     * for eg, it is used to remove redundant project(*) from plan
+     * which will have only one predecessor,but can have multiple success
+     * @param node Node to be removed
+     * @throws PlanException if the node has more than one predecessor
+     */
+    public void removeAndReconnectMultiSucc(E node) throws PlanException {
+        reconnectSuccessors(node, true, true);
     }
     
 
+    
     public void dump(PrintStream ps) {
         ps.println("Ops");
         for (E op : mOps.keySet()) {
@@ -660,5 +805,405 @@
         pp.print(out);
     }
 
+    /**
+     * Swap two operators in a plan.  Both of the operators must have single
+     * inputs and single outputs.
+     * @param first operator
+     * @param second operator
+     * @throws PlanException if either operator is not single input and output.
+     */
+    public void swap(E first, E second) throws PlanException {
+        E firstNode = first;
+        E secondNode = second;
+        
+        if(firstNode == null) {
+            int errCode = 1092;
+            String msg = "First operator in swap is null. Cannot swap null operators.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        if(secondNode == null) {
+            int errCode = 1092;
+            String msg = "Second operator in swap is null. Cannot swap null operators.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        checkInPlan(firstNode);
+        checkInPlan(secondNode);
+        
+        List<E> firstNodePredecessors = (ArrayList<E>)mToEdges.get(firstNode);
+        
+        if(firstNodePredecessors != null && firstNodePredecessors.size() > 1) {
+            int errCode = 1093;
+            String msg = "Swap supports swap of operators with at most one input."
+                            + " Found first operator with " + firstNodePredecessors.size() + " inputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        List<E> firstNodeSuccessors = (ArrayList<E>)mFromEdges.get(firstNode);
+        
+        if(firstNodeSuccessors != null && firstNodeSuccessors.size() > 1) {
+            int errCode = 1093;
+            String msg = "Swap supports swap of operators with at most one output."
+                + " Found first operator with " + firstNodeSuccessors.size() + " outputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        List<E> secondNodePredecessors = (ArrayList<E>)mToEdges.get(secondNode);
+        
+        if(secondNodePredecessors != null && secondNodePredecessors.size() > 1) {
+            int errCode = 1093;
+            String msg = "Swap supports swap of operators with at most one input."
+                + " Found second operator with " + secondNodePredecessors.size() + " inputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        List<E> secondNodeSuccessors = (ArrayList<E>)mFromEdges.get(secondNode);
+        
+        if(secondNodeSuccessors != null && secondNodeSuccessors.size() > 1) {
+            int errCode = 1093;
+            String msg = "Swap supports swap of operators with at most one output."
+                + " Found second operator with " + secondNodeSuccessors.size() + " outputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        E firstNodePredecessor = null;
+        E firstNodeSuccessor = null;
+        E secondNodePredecessor = null;
+        E secondNodeSuccessor = null;
+        
+        if(firstNodePredecessors != null) {
+            firstNodePredecessor = firstNodePredecessors.get(0);
+        }
+        
+        if(firstNodeSuccessors != null) {
+            firstNodeSuccessor = firstNodeSuccessors.get(0);
+        }
+
+        if(secondNodePredecessors != null) {
+            secondNodePredecessor = secondNodePredecessors.get(0);
+        }
+        
+        if(secondNodeSuccessors != null) {
+            secondNodeSuccessor = secondNodeSuccessors.get(0);
+        }
+        
+        boolean immediateNodes = false;
+        
+        if((firstNodeSuccessor == secondNode) && (secondNodePredecessor == firstNode)) {
+            immediateNodes = true;
+        } else if ((secondNodeSuccessor == firstNode) && (firstNodePredecessor == secondNode)) {
+            immediateNodes = true;
+            //swap the firstNode and secondNode
+            E tmpNode = firstNode;
+            firstNode = secondNode;
+            secondNode = tmpNode;
+            
+            //swap the predecessor and successor nodes
+            tmpNode = firstNodePredecessor;
+            firstNodePredecessor = secondNodePredecessor;
+            secondNodePredecessor = tmpNode;
+            
+            tmpNode = firstNodeSuccessor;
+            firstNodeSuccessor = secondNodeSuccessor;
+            secondNodeSuccessor = tmpNode;
+        }
+
+        if(immediateNodes) {
+            //Replace the predecessors and successors of first and second in their respective edge lists       
+            replaceNode(firstNode, secondNodeSuccessor, firstNodeSuccessor, mFromEdges);
+            replaceNode(firstNode, secondNode, firstNodePredecessor, mToEdges);
+            replaceNode(secondNode, firstNode, secondNodeSuccessor, mFromEdges);
+            replaceNode(secondNode, firstNodePredecessor, secondNodePredecessor, mToEdges);
+        } else {
+            //Replace the predecessors and successors of first and second in their respective edge lists       
+            replaceNode(firstNode, secondNodeSuccessor, firstNodeSuccessor, mFromEdges);
+            replaceNode(firstNode, secondNodePredecessor, firstNodePredecessor, mToEdges);
+            replaceNode(secondNode, firstNodeSuccessor, secondNodeSuccessor, mFromEdges);
+            replaceNode(secondNode, firstNodePredecessor, secondNodePredecessor, mToEdges);
+        }
+
+        //Replace first with second in the edges list for first's predecessor and successor        
+        replaceNode(firstNodePredecessor, secondNode, firstNode, mFromEdges);
+        replaceNode(firstNodeSuccessor, secondNode, firstNode, mToEdges);
+        
+        //Replace second with first in the edges list for second's predecessor and successor
+        replaceNode(secondNodePredecessor, firstNode, secondNode, mFromEdges);
+        replaceNode(secondNodeSuccessor, firstNode, secondNode, mToEdges);
+        
+        markDirty();
+    }
+
+    /**
+     * Push one operator in front of another.  This function is for use when
+     * the first operator has multiple inputs.  The caller can specify
+     * which input of the first operator the second operator should be pushed to.
+     * @param first operator, assumed to have multiple inputs.
+     * @param second operator, will be pushed in front of first
+     * @param inputNum indicates which input of the first operator the second
+     * operator will be pushed onto.  Numbered from 0.
+     * @throws PlanException if inputNum does not exist for first operator
+     */
+    public void pushBefore(E first, E second, int inputNum) throws PlanException {
+        E firstNode = first;
+        E secondNode = second;
+        
+        if(firstNode == null) {
+            int errCode = 1085;
+            String msg = "First operator in pushBefore is null. Cannot pushBefore null operators.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        if(secondNode == null) {
+            int errCode = 1085;
+            String msg = "Second operator in pushBefore is null. Cannot pushBefore null operators.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        checkInPlan(firstNode);
+        checkInPlan(secondNode);
+        
+        List<E> firstNodePredecessors = (ArrayList<E>)mToEdges.get(firstNode);
+        
+        if(firstNodePredecessors == null || firstNodePredecessors.size() <= 1) {
+            int size = (firstNodePredecessors == null ? 0 : firstNodePredecessors.size());
+            int errCode = 1086;
+            String msg = "First operator in pushBefore should have multiple inputs."
+                            + " Found first operator with " + size + " inputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        if(inputNum >= firstNodePredecessors.size()) {
+            int errCode = 1087;
+            String msg = "The inputNum " + inputNum + " should be lesser than the number of inputs of the first operator."
+                            + " Found first operator with " + firstNodePredecessors.size() + " inputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        List<E> firstNodeSuccessors = (ArrayList<E>)mFromEdges.get(firstNode);
+        
+        if(firstNodeSuccessors == null) {
+            int errCode = 1088;
+            String msg = "First operator in pushBefore should have at least one output."
+                + " Found first operator with no outputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        List<E> secondNodePredecessors = (ArrayList<E>)mToEdges.get(secondNode);
+        
+        if(secondNodePredecessors == null || secondNodePredecessors.size() > 1) {
+            int size = (secondNodePredecessors == null ? 0 : secondNodePredecessors.size());
+            int errCode = 1088;
+            String msg = "Second operator in pushBefore should have one input."
+                + " Found second operator with " + size + " inputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        List<E> secondNodeSuccessors = (ArrayList<E>)mFromEdges.get(secondNode);
+        
+        //check for multiple edges from first to second
+        int edgesFromFirstToSecond = 0;
+        for(E node: firstNodeSuccessors) {
+            if(node == secondNode) {
+                ++edgesFromFirstToSecond;
+            }
+        }
+        
+        if(edgesFromFirstToSecond == 0) {
+            int errCode = 1089;
+            String msg = "Second operator in pushBefore should be the successor of the First operator.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        } else if (edgesFromFirstToSecond > 1) {
+            int errCode = 1090;
+            String msg = "Second operator can have at most one incoming edge from First operator."
+                + " Found " + edgesFromFirstToSecond + " edges.";
+            throw new PlanException(msg, errCode, PigException.INPUT);            
+        }
+        
+        //check if E (i.e., firstNode) can support multiple outputs before we short-circuit
+        
+        if(!firstNode.supportsMultipleOutputs()) {
+            int numSecondNodeSuccessors = (secondNodeSuccessors == null? 0 : secondNodeSuccessors.size());
+            if((firstNodeSuccessors.size() > 0) || (numSecondNodeSuccessors > 0)) {
+                int errCode = 1091;
+                String msg = "First operator does not support multiple outputs."
+                    + " On completing the pushBefore operation First operator will end up with "
+                    + (firstNodeSuccessors.size() + numSecondNodeSuccessors) + " edges.";
+                throw new PlanException(msg, errCode, PigException.INPUT);
+            }
+        }
+        
+        //Assume that we have a graph which is like
+        //   A   B   C   D
+        //   \   |   |  /
+        //         E
+        //      /  |  \
+        //     F   G   H
+        //      /  |  \
+        //     I   J   K
+        //
+        //Now pushBefore(E, G, 1)
+        //This can be done using the following sequence of transformations
+        //1. Promote G's successors as E's successors using reconnectSuccessors(G)
+        //2. Insert G between B and E using insertBetween(B, G, E)
+        //The graphs after each step
+        //Step 1 - Note that G is standing alone
+        //   A   B   C   D   G
+        //   \   |   |  /
+        //         E
+        //   /  /  |  \  \
+        //  F  I   J   K  H  
+        //Step 2
+        //       B
+        //       |
+        //   A   G   C   D
+        //   \   |   |  /
+        //         E
+        //   /  /  |  \  \
+        //  F  I   J   K  H           
+        
+        reconnectSuccessors(secondNode, false, false);
+        insertBetween(firstNodePredecessors.get(inputNum), secondNode, firstNode);
+
+        markDirty();
+        return;
+    }
+
+    /**
+     * Push one operator after another.  This function is for use when the second
+     * operator has multiple outputs.  The caller can specify which output of the
+     * second operator the first operator should be pushed to.
+     * @param first operator, assumed to have multiple outputs
+     * @param second operator, will be pushed after the first operator
+     * @param outputNum indicates which output of the first operator the second 
+     * operator will be pushed onto.  Numbered from 0.
+     * @throws PlanException if outputNum does not exist for first operator
+     */
+    public void pushAfter(E first, E second, int outputNum) throws PlanException {
+        E firstNode = first;
+        E secondNode = second;
+        
+        if(firstNode == null) {
+            int errCode = 1085;
+            String msg = "First operator in pushAfter is null. Cannot pushBefore null operators.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        if(secondNode == null) {
+            int errCode = 1085;
+            String msg = "Second operator in pushAfter is null. Cannot pushBefore null operators.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        checkInPlan(firstNode);
+        checkInPlan(secondNode);
+        
+        List<E> firstNodePredecessors = (ArrayList<E>)mToEdges.get(firstNode);
+
+        if(firstNodePredecessors == null) {
+            int errCode = 1088;
+            String msg = "First operator in pushAfter should have at least one input."
+                + " Found first operator with no inputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }        
+
+        List<E> firstNodeSuccessors = (ArrayList<E>)mFromEdges.get(firstNode);
+        
+        if(firstNodeSuccessors == null || firstNodeSuccessors.size() <= 1) {
+            int size = (firstNodeSuccessors == null ? 0 : firstNodeSuccessors.size());
+            int errCode = 1086;
+            String msg = "First operator in pushAfter should have multiple outputs."
+                            + " Found first operator with " + size + " outputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        if(outputNum >= firstNodeSuccessors.size()) {
+            int errCode = 1087;
+            String msg = "The outputNum " + outputNum + " should be lesser than the number of outputs of the first operator."
+                            + " Found first operator with " + firstNodeSuccessors.size() + " outputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+        List<E> secondNodePredecessors = (ArrayList<E>)mToEdges.get(secondNode);
+        
+        List<E> secondNodeSuccessors = (ArrayList<E>)mFromEdges.get(secondNode);
+
+        if(secondNodeSuccessors == null || secondNodeSuccessors.size() > 1) {
+            int size = (secondNodeSuccessors == null ? 0 : secondNodeSuccessors.size());
+            int errCode = 1088;
+            String msg = "Second operator in pushAfter should have one output."
+                + " Found second operator with " + size + " outputs.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        }
+        
+
+        //check for multiple edges from second to first
+        int edgesFromSecondToFirst = 0;
+        for(E node: secondNodeSuccessors) {
+            if(node == firstNode) {
+                ++edgesFromSecondToFirst;
+            }
+        }
+        
+        if(edgesFromSecondToFirst == 0) {
+            int errCode = 1089;
+            String msg = "Second operator in pushAfter should be the predecessor of the First operator.";
+            throw new PlanException(msg, errCode, PigException.INPUT);
+        } else if (edgesFromSecondToFirst > 1) {
+            int errCode = 1090;
+            String msg = "Second operator can have at most one outgoing edge from First operator."
+                + " Found " + edgesFromSecondToFirst + " edges.";
+            throw new PlanException(msg, errCode, PigException.INPUT);            
+        }
+        
+        //check if E (i.e., firstNode) can support multiple outputs before we short-circuit
+        
+        if(!firstNode.supportsMultipleInputs()) {
+            int numSecondNodePredecessors = (secondNodePredecessors == null? 0 : secondNodePredecessors.size());
+            if((firstNodePredecessors.size() > 0) || (numSecondNodePredecessors > 0)) {
+                int errCode = 1091;
+                String msg = "First operator does not support multiple inputs."
+                    + " On completing the pushAfter operation First operator will end up with "
+                    + (firstNodePredecessors.size() + numSecondNodePredecessors) + " edges.";
+                throw new PlanException(msg, errCode, PigException.INPUT);
+            }
+        }
+        
+        //Assume that we have a graph which is like
+        //   A   B   C   D
+        //   \   |   |  /
+        //         E
+        //         |
+        //         G
+        //      /  |  \
+        //     I   J   K
+        //
+        //Now pushAfter(G, E, 1)
+        //This can be done using the following sequence of transformations
+        //1. Promote E's predecessors as G's predecessors using reconnectPredecessors(E)
+        //2. Insert E between G and J using insertBetween(G, E, J)
+        //The graphs after each step
+        //Step 1 - Note that E is standing alone
+        //   A   B   C   D   E
+        //   \   |   |  /
+        //         G
+        //      /  |  \
+        //     I   J   K  
+        //Step 2
+        //   A   B   C   D 
+        //   \   |   |  /
+        //         G
+        //      /  |  \
+        //     I   E   K
+        //         |
+        //         J
+        
+        reconnectPredecessors(secondNode, false, false);
+        insertBetween(firstNode, secondNode, firstNodeSuccessors.get(outputNum));
+        
+        markDirty();
+        return;
+
+    }
 
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java?rev=766907&r1=766906&r2=766907&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java Mon Apr 20 23:03:10 2009
@@ -43,9 +43,6 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import com.sun.org.apache.bcel.internal.ExceptionConstants;
-import com.sun.org.apache.xerces.internal.impl.xpath.regex.ParseException;
-
 import junit.framework.TestCase;
 
 public class TestBestFitCast extends TestCase {

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java?rev=766907&r1=766906&r2=766907&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java Mon Apr 20 23:03:10 2009
@@ -78,7 +78,7 @@
 
         @Override
         public void visit(PlanVisitor v) throws VisitorException {
-            ((TVisitor)v).visitSingleOperator(this);
+            ((TVisitor)v).visit(this);
         }
 
         public String name() {
@@ -102,7 +102,7 @@
         }
 
         public void visit(PlanVisitor v) throws VisitorException {
-            ((TVisitor)v).visitMultiOperator(this);
+            ((TVisitor)v).visit(this);
         }
 
         public String name() {
@@ -111,6 +111,56 @@
         }
 
     }
+    
+    class MultiInputSingleOutputOperator extends TOperator {
+        MultiInputSingleOutputOperator(String name) {
+            super(name);
+        }
+
+        public boolean supportsMultipleInputs() {
+            return true;
+        }
+
+        public boolean supportsMultipleOutputs() {
+            return false;
+        }
+
+        @Override
+        public void visit(PlanVisitor v) throws VisitorException {
+            ((TVisitor)v).visit(this);
+        }
+
+        public String name() {
+            //return this.getClass().getName() + " " + mName
+            return mName;
+        }
+
+    }
+    
+    class MultiOutputSingleInputOperator extends TOperator {
+        MultiOutputSingleInputOperator(String name) {
+            super(name);
+        }
+
+        public boolean supportsMultipleInputs() {
+            return false;
+        }
+
+        public boolean supportsMultipleOutputs() {
+            return true;
+        }
+
+        @Override
+        public void visit(PlanVisitor v) throws VisitorException {
+            ((TVisitor)v).visit(this);
+        }
+
+        public String name() {
+            //return this.getClass().getName() + " " + mName
+            return mName;
+        }
+
+    }
 
     class TPlan extends OperatorPlan<TOperator> {
 
@@ -166,15 +216,25 @@
             mJournal = new StringBuilder();
         }
 
-        public void visitSingleOperator(SingleOperator so) throws VisitorException {
+        public void visit(SingleOperator so) throws VisitorException {
             mJournal.append(so.name());
             mJournal.append(' ');
         }
 
-        public void visitMultiOperator(MultiOperator mo) throws VisitorException {
+        public void visit(MultiOperator mo) throws VisitorException {
             mJournal.append(mo.name());
             mJournal.append(' ');
         }
+        
+        public void visit(MultiInputSingleOutputOperator miso) throws VisitorException {
+            mJournal.append(miso.name());
+            mJournal.append(' ');
+        }
+        
+        public void visit(MultiOutputSingleInputOperator mosi) throws VisitorException {
+            mJournal.append(mosi.name());
+            mJournal.append(' ');
+        }
 
         public String getJournal() {
             return mJournal.toString();
@@ -1760,5 +1820,1125 @@
         assertFalse(neverTransform.mTransformed);
         assertTrue(neverTransform.getNumberOfChecks() == 0);
     }
+    
+    //Swap two roots in a graph. Both the roots are disconnected
+    //and are the only nodes in the graph
+    @Test
+    public void testSwapRootsInDisconnectedGraph() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[2];
+        
+        for(int i = 0; i < ops.length; ++i) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+            plan.add(ops[i]);
+        }
+        
+        plan.swap(ops[0], ops[1]);
+        
+        List<TOperator> roots = (ArrayList<TOperator>)plan.getRoots();
+        for(int i = 0; i < roots.size(); ++i) {
+            assertEquals(roots.get(i), ops[i]);
+        }
+    }
+    
+    //Swap two nodes in a graph.
+    //Input
+    //S1->S2
+    //Ouput
+    //S2->S1
+    //Swap again
+    //Output
+    //S1->S2
+    @Test
+    public void testSimpleSwap() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[2];
+        
+        for(int i = 0; i < ops.length; ++i) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+            plan.add(ops[i]);
+        }
+        
+        plan.connect(ops[0], ops[1]);
+
+        //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+        //planPrinter.visit();
+        
+        plan.swap(ops[0], ops[1]);
+        
+        //planPrinter.visit();
+        
+        List<TOperator> roots = (ArrayList<TOperator>)plan.getRoots();
+        assertEquals(roots.get(0), ops[1]);
+        
+        List<TOperator> rootSuccessors = plan.getSuccessors(roots.get(0));
+        assertEquals(rootSuccessors.get(0), ops[0]);
+        
+        List<TOperator> leaves = (ArrayList<TOperator>)plan.getLeaves();
+        assertEquals(leaves.get(0), ops[0]);
+        
+        List<TOperator> leafPredecessors = plan.getPredecessors(leaves.get(0));
+        assertEquals(leafPredecessors.get(0), ops[1]);
+        
+        plan.swap(ops[0], ops[1]);
+        
+        //planPrinter.visit();
+        
+        roots = (ArrayList<TOperator>)plan.getRoots();
+        assertEquals(roots.get(0), ops[0]);
+        
+        rootSuccessors = plan.getSuccessors(roots.get(0));
+        assertEquals(rootSuccessors.get(0), ops[1]);
+        
+        leaves = (ArrayList<TOperator>)plan.getLeaves();
+        assertEquals(leaves.get(0), ops[1]);
+        
+        leafPredecessors = plan.getPredecessors(leaves.get(0));
+        assertEquals(leafPredecessors.get(0), ops[0]);
+    }
+    
+    //Swap two nodes in a graph.
+    //Swap S1 and S3
+    //Input
+    //S1->S2->S3
+    //Intermediate Output
+    //S3->S2->S1
+    //Output
+    //S1->S2->S3
+    @Test
+    public void testSimpleSwap2() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[3];
+        
+        for(int i = 0; i < ops.length; ++i) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+            plan.add(ops[i]);
+        }
+        
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+        //planPrinter.visit();
+        
+        plan.swap(ops[0], ops[2]);
+
+        //planPrinter.visit();
+        
+        List<TOperator> roots = (ArrayList<TOperator>)plan.getRoots();
+        assertEquals(roots.get(0), ops[2]);
+        
+        List<TOperator> rootSuccessors = plan.getSuccessors(roots.get(0));
+        assertEquals(rootSuccessors.get(0), ops[1]);
+        
+        List<TOperator> leaves = (ArrayList<TOperator>)plan.getLeaves();
+        assertEquals(leaves.get(0), ops[0]);
+        
+        List<TOperator> leafPredecessors = plan.getPredecessors(leaves.get(0));
+        assertEquals(leafPredecessors.get(0), ops[1]);
+        
+        plan.swap(ops[0], ops[2]);
+        
+        //planPrinter.visit();
+        
+        roots = (ArrayList<TOperator>)plan.getRoots();
+        assertEquals(roots.get(0), ops[0]);
+        
+        rootSuccessors = plan.getSuccessors(roots.get(0));
+        assertEquals(rootSuccessors.get(0), ops[1]);
+        
+        leaves = (ArrayList<TOperator>)plan.getLeaves();
+        assertEquals(leaves.get(0), ops[2]);
+        
+        leafPredecessors = plan.getPredecessors(leaves.get(0));
+        assertEquals(leafPredecessors.get(0), ops[1]);
+    }
+    
+    //Swap two nodes in a graph and then swap it back again.
+    //Swap S2 and S3
+    //Input
+    //S1->S2->S3
+    //Intermediate Output
+    //S1->S3->S2
+    //Output
+    //S1->S2->S3
+    @Test
+    public void testSimpleSwap3() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[3];
+        
+        for(int i = 0; i < ops.length; ++i) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+            plan.add(ops[i]);
+        }
+        
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+        //planPrinter.visit();
+
+        plan.swap(ops[1], ops[2]);
+        
+        //planPrinter.visit();
+
+        List<TOperator> roots = (ArrayList<TOperator>)plan.getRoots();
+        assertEquals(roots.get(0), ops[0]);
+        
+        List<TOperator> rootSuccessors = plan.getSuccessors(roots.get(0));
+        assertEquals(rootSuccessors.get(0), ops[2]);
+        
+        List<TOperator> leaves = (ArrayList<TOperator>)plan.getLeaves();
+        assertEquals(leaves.get(0), ops[1]);
+        
+        List<TOperator> leafPredecessors = plan.getPredecessors(leaves.get(0));
+        assertEquals(leafPredecessors.get(0), ops[2]);
+        
+        plan.swap(ops[1], ops[2]);
+        
+        //planPrinter.visit();
+
+        roots = (ArrayList<TOperator>)plan.getRoots();
+        assertEquals(roots.get(0), ops[0]);
+        
+        rootSuccessors = plan.getSuccessors(roots.get(0));
+        assertEquals(rootSuccessors.get(0), ops[1]);
+        
+        leaves = (ArrayList<TOperator>)plan.getLeaves();
+        assertEquals(leaves.get(0), ops[2]);
+        
+        leafPredecessors = plan.getPredecessors(leaves.get(0));
+        assertEquals(leafPredecessors.get(0), ops[1]);
+    }
+    
+    //Swap two nodes in a graph and then swap it back again.
+    //Swap S1 and S2
+    //Input
+    //S1->S2->S3
+    //Intermediate Output
+    //S2->S1->S3
+    //Output
+    //S1->S2->S3
+    @Test
+    public void testSimpleSwap4() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[3];
+        
+        for(int i = 0; i < ops.length; ++i) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+            plan.add(ops[i]);
+        }
+        
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+        //planPrinter.visit();
+
+        plan.swap(ops[0], ops[1]);
+
+        //planPrinter.visit();
+        
+        List<TOperator> roots = (ArrayList<TOperator>)plan.getRoots();
+        assertEquals(roots.get(0), ops[1]);
+        
+        List<TOperator> rootSuccessors = plan.getSuccessors(roots.get(0));
+        assertEquals(rootSuccessors.get(0), ops[0]);
+        
+        List<TOperator> leaves = (ArrayList<TOperator>)plan.getLeaves();
+        assertEquals(leaves.get(0), ops[2]);
+        
+        List<TOperator> leafPredecessors = plan.getPredecessors(leaves.get(0));
+        assertEquals(leafPredecessors.get(0), ops[0]);
+        
+        plan.swap(ops[0], ops[1]);
+
+        //planPrinter.visit();
+        
+        roots = (ArrayList<TOperator>)plan.getRoots();
+        assertEquals(roots.get(0), ops[0]);
+        
+        rootSuccessors = plan.getSuccessors(roots.get(0));
+        assertEquals(rootSuccessors.get(0), ops[1]);
+        
+        leaves = (ArrayList<TOperator>)plan.getLeaves();
+        assertEquals(leaves.get(0), ops[2]);
+        
+        leafPredecessors = plan.getPredecessors(leaves.get(0));
+        assertEquals(leafPredecessors.get(0), ops[1]);
+    }
+    
+    //Swap non-existent nodes in a graph and check for exceptions
+    //Swap S1 and S4
+    //Swap S4 and S1
+    //Swap S5 and S4
+    //Swap S1 and null
+    //Swap null and S1
+    //Swap null and null
+    //Input
+    //S1->S2->S3 S4 S5
+    @Test
+    public void testNegativeSimpleSwap() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[5];
+        
+        for(int i = 0; i < ops.length; ++i) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+        }
+
+        for(int i = 0; i < ops.length - 2; ++i) {
+            plan.add(ops[i]);
+        }
+
+        
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+        //planPrinter.visit();
+
+        try {
+            plan.swap(ops[0], ops[3]);
+            fail("Expected exception for node not in plan.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getMessage().contains("not in the plan"));
+        }
+        
+        try {
+            plan.swap(ops[3], ops[0]);
+            fail("Expected exception for node not in plan.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getMessage().contains("not in the plan"));
+        }
+
+        try {
+            plan.swap(ops[4], ops[3]);
+            fail("Expected exception for node not in plan.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getMessage().contains("not in the plan"));
+        }
+        
+        try {
+            plan.swap(ops[0], null);
+            fail("Expected exception for having null as one of the inputs");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1092);
+        }
+        
+        try {
+            plan.swap(null, ops[0]);
+            fail("Expected exception for having null as one of the inputs");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1092);
+        }
+        
+        try {
+            plan.swap(null, null);
+            fail("Expected exception for having null as one of the inputs");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1092);
+        }
+
+    }
+    
+    //Swap nodes that have multiple inputs and multiple outs in a graph and check for exceptions
+    //Input
+    // S1   S2
+    //  \ /
+    //   M1
+    //   |
+    //   M2
+    //  / \
+    // S3   S4
+    //Swap S1 and M1
+    //Swap M1 and S1
+    //Swap M1 and M2
+    //Swap M2 and M1
+    //Swap M2 and S3
+    //Swap S3 and M2
+    @Test
+    public void testNegativeSimpleSwap1() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[6];
+        ops[0] = new SingleOperator("1");
+        plan.add(ops[0]);
+        ops[1] = new SingleOperator("2");
+        plan.add(ops[1]);
+        ops[2] = new MultiOperator("3");
+        plan.add(ops[2]);
+        plan.connect(ops[0], ops[2]);
+        plan.connect(ops[1], ops[2]);
+        
+        ops[3] = new SingleOperator("4");
+        plan.add(ops[3]);
+        ops[4] = new SingleOperator("5");
+        plan.add(ops[4]);
+        ops[5] = new MultiOperator("6");
+        plan.add(ops[5]);
+        plan.connect(ops[5], ops[3]);
+        plan.connect(ops[5], ops[4]);
+
+        plan.connect(ops[2], ops[5]);
+
+        //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+        //planPrinter.visit();
+
+        try {
+            plan.swap(ops[0], ops[2]);
+            fail("Expected exception for multi-input operator.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1093);
+        }
+        
+        try {
+            plan.swap(ops[2], ops[0]);
+            fail("Expected exception for multi-input operator.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1093);
+        }
+
+        try {
+            plan.swap(ops[2], ops[5]);
+            fail("Expected exception for multi-input operator.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1093);
+        }
+        
+        try {
+            plan.swap(ops[5], ops[2]);
+            fail("Expected exception for multi-output operator.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1093);
+        }
+        
+        try {
+            plan.swap(ops[5], ops[3]);
+            fail("Expected exception for multi-output operator.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1093);
+        }
+        
+        try {
+            plan.swap(ops[3], ops[5]);
+            fail("Expected exception for multi-output operator.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1093);
+        }
+
+    }
+    
+    //Push M11 before M10's inputs - 0 through 3
+    //Input
+    //  S1   S2  S3  S4
+    //   \   |   |  /
+    //        M10
+    //      /  |  \
+    //     S5  M11  S6
+    //      /  |  \
+    //     S7  S8  S9
+    //Output when pushed before 1st input
+    //       S2
+    //       |
+    //  S1   M11  S3  S4
+    //   \   |   |  /
+    //       M10
+    //   / / |  \ \
+    // S5 S7 S8 S9 S6
+    @Test
+    public void testpushBefore() throws Exception {
+        for(int index = 0; index < 4; index++) {
+            TPlan plan = new TPlan();
+            TOperator[] ops = new TOperator[11];
+            
+            for(int i = 0; i < ops.length - 2; ++i) {
+                ops[i] = new SingleOperator(Integer.toString(i+1));
+                plan.add(ops[i]);
+            }
+            
+            ops[9] = new MultiOperator("10");
+            plan.add(ops[9]);
+            
+            ops[10] = new MultiOperator("11");
+            plan.add(ops[10]);
+            
+            plan.connect(ops[0], ops[9]);
+            plan.connect(ops[1], ops[9]);
+            plan.connect(ops[2], ops[9]);
+            plan.connect(ops[3], ops[9]);
+            plan.connect(ops[9], ops[4]);
+            plan.connect(ops[9], ops[10]);
+            plan.connect(ops[9], ops[5]);
+            
+            plan.connect(ops[10], ops[6]);
+            plan.connect(ops[10], ops[7]);
+            plan.connect(ops[10], ops[8]);
+            
+            //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+            //planPrinter.visit();
+    
+            plan.pushBefore(ops[9], ops[10], index) ;
+            
+            //planPrinter.visit();
+            
+            Set<TOperator> rootSet = new HashSet<TOperator>();
+            rootSet.add(ops[0]);
+            rootSet.add(ops[1]);
+            rootSet.add(ops[2]);
+            rootSet.add(ops[3]);
+            
+            Set<TOperator> expectedRootSet = new HashSet<TOperator>(plan.getRoots());
+            
+            rootSet.retainAll(expectedRootSet);
+            assertTrue(rootSet.size() == 4);
+            
+            Set<TOperator> leafSet = new HashSet<TOperator>();
+            leafSet.add(ops[4]);
+            leafSet.add(ops[5]);
+            leafSet.add(ops[6]);
+            leafSet.add(ops[7]);
+            leafSet.add(ops[8]);
+            
+            Set<TOperator> expectedLeafSet = new HashSet<TOperator>(plan.getLeaves());
+            
+            leafSet.retainAll(expectedLeafSet);
+            assertTrue(leafSet.size() == 5);
+            
+            List<TOperator> m10Predecessors = plan.getPredecessors(ops[9]);            
+            assertTrue(m10Predecessors.get(index) == ops[10]);
+            
+            List<TOperator> m11Predecessors = plan.getPredecessors(ops[10]);
+            assertTrue(m11Predecessors.get(0) == ops[index]);
+        }
+    }
+    
+    //Push S5 and S6 before M10's input
+    //Input
+    //  S1   S2  S3  S4
+    //   \   |   |  /
+    //        M10
+    //      /  |  \
+    //     S5  M11  S6
+    //      /  |  \
+    //     S7  S8  S9
+    //Output when pushed before 1st input
+    //       S2
+    //       |
+    //  S1   S5  S3  S4
+    //   \   |   |  /
+    //        M10
+    //       /   \
+    //      M11   S6
+    //    /  |  \
+    //   S7  S8  S9
+    @Test
+    public void testpushBefore2() throws Exception {
+        for(int outerIndex = 0; outerIndex < 2; outerIndex++) {
+            for(int index = 0; index < 2; index++) {
+                TPlan plan = new TPlan();
+                TOperator[] ops = new TOperator[11];
+                
+                for(int i = 0; i < ops.length - 2; ++i) {
+                    ops[i] = new SingleOperator(Integer.toString(i+1));
+                    plan.add(ops[i]);
+                }
+                
+                ops[9] = new MultiOperator("10");
+                plan.add(ops[9]);
+                
+                ops[10] = new MultiOperator("11");
+                plan.add(ops[10]);
+                
+                plan.connect(ops[0], ops[9]);
+                plan.connect(ops[1], ops[9]);
+                plan.connect(ops[2], ops[9]);
+                plan.connect(ops[3], ops[9]);
+                plan.connect(ops[9], ops[4]);
+                plan.connect(ops[9], ops[10]);
+                plan.connect(ops[9], ops[5]);
+                
+                plan.connect(ops[10], ops[6]);
+                plan.connect(ops[10], ops[7]);
+                plan.connect(ops[10], ops[8]);
+                
+                //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+                //planPrinter.visit();
+        
+                int secondNodeIndex = outerIndex + 4;
+                plan.pushBefore(ops[9], ops[secondNodeIndex], index) ;
+                
+                //planPrinter.visit();
+                
+                Set<TOperator> rootSet = new HashSet<TOperator>();
+                rootSet.add(ops[0]);
+                rootSet.add(ops[1]);
+                rootSet.add(ops[2]);
+                rootSet.add(ops[3]);
+                
+                Set<TOperator> expectedRootSet = new HashSet<TOperator>(plan.getRoots());
+                
+                rootSet.retainAll(expectedRootSet);
+                assertTrue(rootSet.size() == 4);
+                
+                Set<TOperator> leafSet = new HashSet<TOperator>();
+                for(int leafIndex = 4; leafIndex < 9; ++leafIndex) {
+                    if(leafIndex != secondNodeIndex) {
+                        leafSet.add(ops[leafIndex]);
+                    }
+                }
+                
+                Set<TOperator> expectedLeafSet = new HashSet<TOperator>(plan.getLeaves());
+                
+                leafSet.retainAll(expectedLeafSet);
+                assertTrue(leafSet.size() == 4);
+                
+                List<TOperator> outerIndexNodePredecessors = plan.getPredecessors(ops[secondNodeIndex]);            
+                assertTrue(outerIndexNodePredecessors.get(0) == ops[index]);
+                
+                List<TOperator> m10Predecessors = plan.getPredecessors(ops[9]);            
+                assertTrue(m10Predecessors.get(index) == ops[secondNodeIndex]);
+            }
+        }
+    }
+
+    //Push non-existent nodes in a graph and check for exceptions
+    //Push S1 after S4
+    //Push S4 after S1
+    //Push S5 after S4
+    //Push S1 after null
+    //Push null after S1
+    //Push null after null
+    //Input
+    //S1->S2->S3 S4 S5
+    @Test
+    public void testNegativePushBefore() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[5];
+        
+        for(int i = 0; i < ops.length; ++i) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+        }
+
+        for(int i = 0; i < ops.length - 2; ++i) {
+            plan.add(ops[i]);
+        }
+
+        
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+        //planPrinter.visit();
+
+        try {
+            plan.pushBefore(ops[0], ops[3], 0);
+            fail("Expected exception for node not in plan.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getMessage().contains("not in the plan"));
+        }
+        
+        try {
+            plan.pushBefore(ops[3], ops[0], 0);
+            fail("Expected exception for node not in plan.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getMessage().contains("not in the plan"));
+        }
+
+        try {
+            plan.pushBefore(ops[4], ops[3], 0);
+            fail("Expected exception for node not in plan.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getMessage().contains("not in the plan"));
+        }
+        
+        try {
+            plan.pushBefore(ops[0], null, 0);
+            fail("Expected exception for having null as one of the inputs");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1085);
+        }
+        
+        try {
+            plan.pushBefore(null, ops[0], 0);
+            fail("Expected exception for having null as one of the inputs");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1085);
+        }
+        
+        try {
+            plan.pushBefore(null, null, 0);
+            fail("Expected exception for having null as one of the inputs");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1085);
+        }
+
+    }
+
+    //Negative test cases
+    //Input
+    //  S1   S2  S3  S4
+    //   \   |   |  /
+    //        M10
+    //      /  |  \
+    //     S5  M11  S6
+    //      /  |  \
+    //     S7  S8  S9
+    @Test
+    public void testNegativePushBefore2() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[11];
+        
+        for(int i = 0; i < ops.length - 2; ++i) {
+            ops[i] = new SingleOperator(Integer.toString(i+1));
+            plan.add(ops[i]);
+        }
+        
+        ops[9] = new MultiOperator("10");
+        plan.add(ops[9]);
+        
+        ops[10] = new MultiOperator("11");
+        plan.add(ops[10]);
+        
+        plan.connect(ops[0], ops[9]);
+        plan.connect(ops[1], ops[9]);
+        plan.connect(ops[2], ops[9]);
+        plan.connect(ops[3], ops[9]);
+        plan.connect(ops[9], ops[4]);
+        plan.connect(ops[9], ops[10]);
+        plan.connect(ops[9], ops[5]);
+        
+        plan.connect(ops[10], ops[6]);
+        plan.connect(ops[10], ops[7]);
+        plan.connect(ops[10], ops[8]);
+        
+        //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+        //planPrinter.visit();
+
+        try {
+            plan.pushBefore(ops[0], ops[9], 0) ;
+            fail("Expected exception for first operator having null predecessors");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1086);
+        }
+        
+        try {
+            plan.pushBefore(ops[10], ops[6], 0) ;
+            fail("Expected exception for first operator having one predecessor");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1086);
+        }
+        
+        try {
+            plan.pushBefore(ops[9], ops[10], 4) ;
+            fail("Expected exception for inputNum exceeding number of predecessors");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1087);
+        }
+        
+        try {
+            plan.pushBefore(ops[9], ops[0], 0) ;
+            fail("Expected exception for second operator having null predecessors");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1088);
+        }
+
+        try {
+            plan.pushBefore(ops[9], ops[9], 0) ;
+            fail("Expected exception for second operator having more than one predecessor");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1088);
+        }
+        
+        try {
+            plan.pushBefore(ops[9], ops[8], 0) ;
+            fail("Expected exception for second operator not being a successor of first operator");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1089);
+        }
+        
+        plan.disconnect(ops[9], ops[4]);
+        plan.disconnect(ops[9], ops[5]);
+        
+        MultiInputSingleOutputOperator miso = new MultiInputSingleOutputOperator("12");
+        
+        plan.replace(ops[9], miso);
+        
+        try {
+            plan.pushBefore(miso, ops[10], 0) ;
+            fail("Expected exception for trying to connect multiple outputs to the first operator");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1091);
+        }
+        
+    }
+
+    //Push M10 after M11's outputs - 0 through 2
+    //Input
+    //  S1   S2  S3  S4
+    //   \   |   |  /
+    //        M10
+    //    S5   |  S6
+    //     \   |  /
+    //        M11
+    //      /  |  \
+    //     S7  S8  S9
+    //Output when pushed after 1st output
+    //  S5 S1  S2  S3  S4 S6
+    //   \   \  \  /  /  /
+    //           M10
+    //         /  |  \
+    //        S7  M11  S9
+    //            |
+    //            S8
+
+    @Test
+    public void testpushAfter() throws Exception {
+        for(int index = 0; index < 3; index++) {
+            TPlan plan = new TPlan();
+            TOperator[] ops = new TOperator[11];
+            
+            for(int i = 0; i < ops.length - 2; ++i) {
+                ops[i] = new SingleOperator(Integer.toString(i+1));
+                plan.add(ops[i]);
+            }
+            
+            ops[9] = new MultiOperator("10");
+            plan.add(ops[9]);
+            
+            ops[10] = new MultiOperator("11");
+            plan.add(ops[10]);
+            
+            plan.connect(ops[0], ops[9]);
+            plan.connect(ops[1], ops[9]);
+            plan.connect(ops[2], ops[9]);
+            plan.connect(ops[3], ops[9]);
+            plan.connect(ops[9], ops[10]);
+            plan.connect(ops[4], ops[10]);
+            plan.connect(ops[5], ops[10]);
+            
+            plan.connect(ops[10], ops[6]);
+            plan.connect(ops[10], ops[7]);
+            plan.connect(ops[10], ops[8]);
+            
+            //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+            //planPrinter.visit();
+    
+            plan.pushAfter(ops[10], ops[9], index) ;
+            
+            //planPrinter.visit();
+            
+            Set<TOperator> rootSet = new HashSet<TOperator>();
+            rootSet.add(ops[0]);
+            rootSet.add(ops[1]);
+            rootSet.add(ops[2]);
+            rootSet.add(ops[3]);
+            rootSet.add(ops[4]);
+            rootSet.add(ops[5]);
+            
+            Set<TOperator> expectedRootSet = new HashSet<TOperator>(plan.getRoots());
+            
+            rootSet.retainAll(expectedRootSet);
+            assertTrue(rootSet.size() == 6);
+            
+            Set<TOperator> leafSet = new HashSet<TOperator>();
+            leafSet.add(ops[6]);
+            leafSet.add(ops[7]);
+            leafSet.add(ops[8]);
+            
+            Set<TOperator> expectedLeafSet = new HashSet<TOperator>(plan.getLeaves());
+            
+            leafSet.retainAll(expectedLeafSet);
+            assertTrue(leafSet.size() == 3);
+            
+            List<TOperator> m10Successors = plan.getSuccessors(ops[9]);            
+            assertTrue(m10Successors.get(0) == ops[index + 6]);
+            
+            List<TOperator> m11Successors = plan.getSuccessors(ops[10]);
+            assertTrue(m11Successors.get(index) == ops[9]);
+        }
+    }
+
+    //Push S5 and S6 after M11's outputs - 0 through 2
+    //Input
+    //  S1   S2  S3  S4
+    //   \   |   |  /
+    //        M10
+    //    S5   |  S6
+    //     \   |  /
+    //        M11
+    //      /  |  \
+    //     S7  S8  S9
+    //Output when S5 is pushed after 1st output
+    //  S1   S2  S3  S4
+    //   \   |   |  /
+    //        M10
+    //         |  S6
+    //         |  /
+    //        M11
+    //      /  |  \
+    //     S7  S5  S9
+    //         |
+    //         S8
+
+    @Test
+    public void testpushAfter1() throws Exception {
+        for(int outerIndex = 0; outerIndex < 2; outerIndex++) {
+            for(int index = 0; index < 3; index++) {
+                TPlan plan = new TPlan();
+                TOperator[] ops = new TOperator[11];
+                
+                for(int i = 0; i < ops.length - 2; ++i) {
+                    ops[i] = new SingleOperator(Integer.toString(i+1));
+                    plan.add(ops[i]);
+                }
+                
+                ops[9] = new MultiOperator("10");
+                plan.add(ops[9]);
+                
+                ops[10] = new MultiOperator("11");
+                plan.add(ops[10]);
+                
+                plan.connect(ops[0], ops[9]);
+                plan.connect(ops[1], ops[9]);
+                plan.connect(ops[2], ops[9]);
+                plan.connect(ops[3], ops[9]);
+                plan.connect(ops[9], ops[10]);
+                plan.connect(ops[4], ops[10]);
+                plan.connect(ops[5], ops[10]);
+                
+                plan.connect(ops[10], ops[6]);
+                plan.connect(ops[10], ops[7]);
+                plan.connect(ops[10], ops[8]);
+                
+                //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+                //planPrinter.visit();
+        
+                int secondNodeIndex = outerIndex + 4;
+                plan.pushAfter(ops[10], ops[secondNodeIndex], index) ;
+                
+                //planPrinter.visit();
+                
+                Set<TOperator> rootSet = new HashSet<TOperator>();
+                rootSet.add(ops[0]);
+                rootSet.add(ops[1]);
+                rootSet.add(ops[2]);
+                rootSet.add(ops[3]);
+
+                for(int rootIndex = 0; rootIndex < 6; ++rootIndex) {
+                    if(rootIndex != secondNodeIndex) {
+                        rootSet.add(ops[rootIndex]);
+                    }
+                }
+
+                Set<TOperator> expectedRootSet = new HashSet<TOperator>(plan.getRoots());
+                
+                rootSet.retainAll(expectedRootSet);
+                assertTrue(rootSet.size() == 5);
+
+                Set<TOperator> leafSet = new HashSet<TOperator>();
+                leafSet.add(ops[6]);
+                leafSet.add(ops[7]);
+                leafSet.add(ops[8]);
+                
+                Set<TOperator> expectedLeafSet = new HashSet<TOperator>(plan.getLeaves());
+                
+                leafSet.retainAll(expectedLeafSet);
+                assertTrue(leafSet.size() == 3);
+                
+                List<TOperator> outerIndexNodeSuccessors = plan.getSuccessors(ops[secondNodeIndex]);
+                assertTrue(outerIndexNodeSuccessors.get(0) == ops[index + 6]);
+                
+                List<TOperator> m11Successors = plan.getSuccessors(ops[10]);            
+                assertTrue(m11Successors.get(index) == ops[secondNodeIndex]);
+            }
+        }
+    }
+    
+    //Push non-existent nodes in a graph and check for exceptions
+    //Push S1 after S4
+    //Push S4 after S1
+    //Push S5 after S4
+    //Push S1 after null
+    //Push null after S1
+    //Push null after null
+    //Input
+    //S1->S2->S3 S4 S5
+    @Test
+    public void testNegativePushAfter() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[5];
+        
+        for(int i = 0; i < ops.length; ++i) {
+            ops[i] = new SingleOperator(Integer.toString(i));
+        }
+
+        for(int i = 0; i < ops.length - 2; ++i) {
+            plan.add(ops[i]);
+        }
+
+        
+        plan.connect(ops[0], ops[1]);
+        plan.connect(ops[1], ops[2]);
+
+        //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+        //planPrinter.visit();
+
+        try {
+            plan.pushAfter(ops[0], ops[3], 0);
+            fail("Expected exception for node not in plan.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getMessage().contains("not in the plan"));
+        }
+        
+        try {
+            plan.pushAfter(ops[3], ops[0], 0);
+            fail("Expected exception for node not in plan.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getMessage().contains("not in the plan"));
+        }
+
+        try {
+            plan.pushAfter(ops[4], ops[3], 0);
+            fail("Expected exception for node not in plan.");
+        } catch (PlanException pe) {
+            assertTrue(pe.getMessage().contains("not in the plan"));
+        }
+        
+        try {
+            plan.pushAfter(ops[0], null, 0);
+            fail("Expected exception for having null as one of the inputs");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1085);
+        }
+        
+        try {
+            plan.pushAfter(null, ops[0], 0);
+            fail("Expected exception for having null as one of the inputs");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1085);
+        }
+        
+        try {
+            plan.pushAfter(null, null, 0);
+            fail("Expected exception for having null as one of the inputs");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1085);
+        }
+
+    }
+
+    //Negative test cases
+    //Input
+    //  S1   S2  S3  S4
+    //   \   |   |  /
+    //        M10
+    //    S5   |  S6
+    //     \   |  /
+    //        M11
+    //      /  |  \
+    //     S7  S8  S9
+    @Test
+    public void testNegativePushAfter2() throws Exception {
+        TPlan plan = new TPlan();
+        TOperator[] ops = new TOperator[11];
+        
+        for(int i = 0; i < ops.length - 2; ++i) {
+            ops[i] = new SingleOperator(Integer.toString(i+1));
+            plan.add(ops[i]);
+        }
+        
+        ops[9] = new MultiOperator("10");
+        plan.add(ops[9]);
+        
+        ops[10] = new MultiOperator("11");
+        plan.add(ops[10]);
+        
+        plan.connect(ops[0], ops[9]);
+        plan.connect(ops[1], ops[9]);
+        plan.connect(ops[2], ops[9]);
+        plan.connect(ops[3], ops[9]);
+        plan.connect(ops[9], ops[10]);
+        plan.connect(ops[4], ops[10]);
+        plan.connect(ops[5], ops[10]);
+        
+        plan.connect(ops[10], ops[6]);
+        plan.connect(ops[10], ops[7]);
+        plan.connect(ops[10], ops[8]);
+        
+        //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+        //planPrinter.visit();
+
+        try {
+            plan.pushAfter(ops[6], ops[9], 0) ;
+            fail("Expected exception for first operator having null successors");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1086);
+        }
+        
+        try {
+            plan.pushAfter(ops[0], ops[9], 0) ;
+            fail("Expected exception for first operator having no inputs");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1088);
+        }
+        
+        try {
+            plan.pushAfter(ops[9], ops[6], 0) ;
+            fail("Expected exception for first operator having one successor");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1086);
+        }
+        
+        try {
+            plan.pushAfter(ops[6], ops[10], 0) ;
+            fail("Expected exception for first operator having one successors");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1086);
+        }
+
+        
+        try {
+            plan.pushAfter(ops[10], ops[9], 4) ;
+            fail("Expected exception for outputNum exceeding the number of outputs of first operator");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1087);
+        }
+
+        try {
+            plan.pushAfter(ops[10], ops[6], 0) ;
+            fail("Expected exception for second operator having null successors");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1088);
+        }
+        
+        try {
+            plan.pushAfter(ops[10], ops[10], 0) ;
+            fail("Expected exception for second operator having more than one successor");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1088);
+        }
+        
+        try {
+            plan.pushAfter(ops[10], ops[0], 0) ;
+            fail("Expected exception for second operator not being a predecessor of first operator");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1089);
+        }
+        
+        plan.disconnect(ops[4], ops[10]);
+        plan.disconnect(ops[5], ops[10]);
+        
+        MultiOutputSingleInputOperator mosi = new MultiOutputSingleInputOperator("12");
+        
+        plan.replace(ops[10], mosi);
+        
+        try {
+            plan.pushAfter(mosi, ops[9], 0) ;
+            fail("Expected exception for trying to connect multiple inputs to the first operator");
+        } catch (PlanException pe) {
+            assertTrue(pe.getErrorCode() == 1091);
+        }
+        
+    }
+
 }