You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2009/04/21 01:03:11 UTC
svn commit: r766907 - in /hadoop/pig/trunk:
src/org/apache/pig/impl/plan/OperatorPlan.java
test/org/apache/pig/test/TestBestFitCast.java
test/org/apache/pig/test/TestOperatorPlan.java
Author: sms
Date: Mon Apr 20 23:03:10 2009
New Revision: 766907
URL: http://svn.apache.org/viewvc?rev=766907&view=rev
Log:
PIG-693: Proposed improvements to pig's optimizer
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java
hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=766907&r1=766906&r2=766907&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Mon Apr 20 23:03:10 2009
@@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Set;
+import org.apache.pig.PigException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -416,7 +417,7 @@
* @param newNode new node to insert. This node must have already been
* added to the plan.
* @param before Node to insert this node before
- * @throws PlanException if it encounters trouble disconecting or
+ * @throws PlanException if it encounters trouble disconnecting or
* connecting nodes.
*/
public void insertBetween(
@@ -436,20 +437,48 @@
// replaces (src -> dst) entry in multiMap with (src -> replacement)
private boolean replaceNode(E src, E replacement, E dst, MultiMap<E, E> multiMap) {
- Collection c = multiMap.get(src);
- if (c == null) return false;
-
- ArrayList al = new ArrayList(c);
- for(int i = 0; i < al.size(); ++i) {
- E to = (E)al.get(i);
- if(to.equals(dst)) {
- al.set(i, replacement);
- multiMap.removeKey(src);
- multiMap.put(src, al);
+ if(multiMap == null) return false;
+
+ if(src == null) return false;
+
+ List<E> nodes = (ArrayList<E>)multiMap.get(src);
+ if (nodes == null) {
+ //we need to add replacement to the multimap as long as replacement != null
+ if(replacement == null) {
+ return false;
+ } else if (dst == null) {
+ ArrayList<E> replacementNodes = new ArrayList<E>();
+ replacementNodes.add(replacement);
+ multiMap.put(src, replacementNodes);
return true;
+ } else {
+ return false;
+ }
+ }
+
+ if(dst == null) return false;
+
+ boolean replaced = false;
+ ArrayList<E> replacementNodes = new ArrayList<E>();
+ for(int i = 0; i < nodes.size(); ++i) {
+ E to = nodes.get(i);
+ if(to.equals(dst)) {
+ replaced = true;
+ if(replacement != null) {
+ replacementNodes.add(replacement);
+ }
+ } else {
+ replacementNodes.add(to);
+ }
+ }
+
+ if(replaced) {
+ multiMap.removeKey(src);
+ if(replacementNodes.size() > 0) {
+ multiMap.put(src, replacementNodes);
}
}
- return false;
+ return replaced;
}
/**
@@ -536,20 +565,7 @@
if (pred != null && succ != null) connect(pred, succ);
}
- /**
- * Remove a node in a way that connects the node's predecessor (if any)
- * with the node's successors (if any). This function handles the
- * case where the node has *one* predecessor and one or more successors.
- * It replaces the predecessor in same position as node was in
- * each of the successors predecessor list(getPredecessors()), to
- * preserve input ordering
- * for eg, it is used to remove redundant project(*) from plan
- * which will have only one predecessor,but can have multiple success
- * @param node Node to be removed
- * @throws PlanException if the node has more than one predecessor
- */
- public void removeAndReconnectMultiSucc(E node) throws PlanException {
-
+ private void reconnectSuccessors(E node, boolean successorRequired, boolean removeNode) throws PlanException {
// Before:
// A (predecessor (only one) )
// / |
@@ -563,7 +579,7 @@
// X C1 C2 C3 ...
// the variable names are from above example
- E nodeB = node;
+ E nodeB = node;
List<E> preds = getPredecessors(nodeB);
//checking pre-requisite conditions
if (preds == null || preds.size() != 1) {
@@ -583,57 +599,186 @@
Collection<E> nodeC = mFromEdges.get(nodeB);
//checking pre-requisite conditions
- if (nodeC == null || nodeC.size() == 0) {
- PlanException pe = new PlanException("Attempt to remove " +
- " and reconnect for node with no successors.");
- log.error(pe.getMessage());
- throw pe;
- }
+ if(successorRequired) {
+ if (nodeC == null || nodeC.size() == 0) {
+ PlanException pe = new PlanException("Attempt to remove " +
+ " and reconnect for node with no successors.");
+ log.error(pe.getMessage());
+ throw pe;
+ }
+ }
// replace B in A.succesors and add B.successors(ie C) to it
replaceAndAddSucessors(nodeA, nodeB);
// for all C(succs) , replace B(node) in predecessors, with A(pred)
- for(E c: nodeC) {
- Collection<E> sPreds = mToEdges.get(c);
- ArrayList<E> newPreds = new ArrayList<E>(sPreds.size());
- for(E p: sPreds){
- if(p == nodeB){
- //replace
- newPreds.add(nodeA);
+ if(nodeC != null) {
+ for(E c: nodeC) {
+ Collection<E> sPreds = mToEdges.get(c);
+ ArrayList<E> newPreds = new ArrayList<E>(sPreds.size());
+ for(E p: sPreds){
+ if(p == nodeB){
+ //replace
+ newPreds.add(nodeA);
+ }
+ else{
+ newPreds.add(p);
+ }
}
- else{
- newPreds.add(p);
+ mToEdges.removeKey(c);
+ mToEdges.put(c,newPreds);
+
+ }
+ }
+
+ if(removeNode) {
+ remove(nodeB);
+ } else {
+ //make sure that the node does not have any dangling from and to edges
+ mFromEdges.removeKey(nodeB);
+ mToEdges.removeKey(nodeB);
+ }
+ }
+
+ private void reconnectPredecessors(E node, boolean predecessorRequired, boolean removeNode) throws PlanException {
+ // Before:
+ // C1 C2 C3 ... (Predecessors)
+ // \ | / \
+ // X B(nodeB) Y(some successor of a Cn)
+ // \ |
+ // A (successor (only one) )
+
+
+ // should become
+ // After:
+ // X C1 C2 C3 ...
+ // \ \ | / \
+ // A Y
+ // the variable names are from above example
+
+ E nodeB = node;
+ List<E> nodeBsuccessors = getSuccessors(nodeB);
+ //checking pre-requisite conditions
+ if (nodeBsuccessors == null || nodeBsuccessors.size() != 1) {
+ Integer size = null;
+ if(nodeBsuccessors != null)
+ size = nodeBsuccessors.size();
+
+ PlanException pe = new PlanException("Attempt to remove " +
+ " and reconnect for node with " + size +
+ " successors.");
+ log.error(pe.getMessage());
+ throw pe;
+ }
+
+ //A and C
+ E nodeA = nodeBsuccessors.get(0);
+ Collection<E> nodeC = mToEdges.get(nodeB);
+
+ //checking pre-requisite conditions
+ if(predecessorRequired) {
+ if (nodeC == null || nodeC.size() == 0) {
+ PlanException pe = new PlanException("Attempt to remove " +
+ " and reconnect for node with no predecessors.");
+ log.error(pe.getMessage());
+ throw pe;
+ }
+ }
+
+
+ // replace B in A.predecessors and add B.predecessors(ie C) to it
+ replaceAndAddPredecessors(nodeA, nodeB);
+
+ // for all C(predecessors) , replace B(node) in successors, with A(successor)
+ if(nodeC != null) {
+ for(E c: nodeC) {
+ Collection<E> sPreds = mFromEdges.get(c);
+ ArrayList<E> newPreds = new ArrayList<E>(sPreds.size());
+ for(E p: sPreds){
+ if(p == nodeB){
+ //replace
+ newPreds.add(nodeA);
+ }
+ else{
+ newPreds.add(p);
+ }
}
+ mFromEdges.removeKey(c);
+ mFromEdges.put(c,newPreds);
+
}
- mToEdges.removeKey(c);
- mToEdges.put(c,newPreds);
-
}
- remove(nodeB);
+
+ if(removeNode) {
+ remove(nodeB);
+ } else {
+ //make sure that the node does not have any dangling from and to edges
+ mFromEdges.removeKey(nodeB);
+ mToEdges.removeKey(nodeB);
+ }
}
- //removes entry for succ in list of successors of nd, and adds successors
- // of succ in its place
- // @param nd - parent node whose entry for succ needs to be replaced
- // @param succ - see above
- private void replaceAndAddSucessors(E nd, E succ) throws PlanException {
- Collection<E> oldSuccs = mFromEdges.get(nd);
- Collection<E> repSuccs = mFromEdges.get(succ);
- ArrayList<E> newSuccs = new ArrayList<E>(oldSuccs.size() - 1 + repSuccs.size() );
- for(E s: oldSuccs){
- if(s == succ){
- newSuccs.addAll(repSuccs);
+ // removes entry for successor in list of successors of node
+ // and adds successors of successor in its place
+ // @param noded - parent node whose entry for successor needs to be replaced
+ // @param successor - see above
+ private void replaceAndAddSucessors(E node, E successor) throws PlanException {
+ Collection<E> oldSuccessors = mFromEdges.get(node);
+ Collection<E> replacementSuccessors = mFromEdges.get(successor);
+ ArrayList<E> newSuccessors = new ArrayList<E>();
+ for(E s: oldSuccessors){
+ if(s == successor){
+ if(replacementSuccessors != null) {
+ newSuccessors.addAll(replacementSuccessors);
+ }
+ }else{
+ newSuccessors.add(s);
+ }
+ }
+ mFromEdges.removeKey(node);
+ mFromEdges.put(node,newSuccessors);
+ }
+
+ // removes entry for predecessor in list of predecessors of node,
+ // and adds predecessors of predecessor in its place
+ // @param node - parent node whose entry for predecessor needs to be replaced
+ // @param predecessor - see above
+ private void replaceAndAddPredecessors(E node, E predecessor) throws PlanException {
+ Collection<E> oldPredecessors = mToEdges.get(node);
+ Collection<E> replacementPredecessors = mToEdges.get(predecessor);
+ ArrayList<E> newPredecessors = new ArrayList<E>();
+ for(E p: oldPredecessors){
+ if(p == predecessor){
+ if(replacementPredecessors != null) {
+ newPredecessors.addAll(replacementPredecessors);
+ }
}else{
- newSuccs.add(s);
+ newPredecessors.add(p);
}
}
- mFromEdges.removeKey(nd);
- mFromEdges.put(nd,newSuccs);
+ mToEdges.removeKey(node);
+ mToEdges.put(node,newPredecessors);
+ }
+
+ /**
+ * Remove a node in a way that connects the node's predecessor (if any)
+ * with the node's successors (if any). This function handles the
+ * case where the node has *one* predecessor and one or more successors.
+ * It replaces the predecessor in same position as node was in
+ * each of the successors predecessor list(getPredecessors()), to
+ * preserve input ordering
+ * for eg, it is used to remove redundant project(*) from plan
+ * which will have only one predecessor,but can have multiple success
+ * @param node Node to be removed
+ * @throws PlanException if the node has more than one predecessor
+ */
+ public void removeAndReconnectMultiSucc(E node) throws PlanException {
+ reconnectSuccessors(node, true, true);
}
+
public void dump(PrintStream ps) {
ps.println("Ops");
for (E op : mOps.keySet()) {
@@ -660,5 +805,405 @@
pp.print(out);
}
+ /**
+ * Swap two operators in a plan. Both of the operators must have single
+ * inputs and single outputs.
+ * @param first operator
+ * @param second operator
+ * @throws PlanException if either operator is not single input and output.
+ */
+ public void swap(E first, E second) throws PlanException {
+ E firstNode = first;
+ E secondNode = second;
+
+ if(firstNode == null) {
+ int errCode = 1092;
+ String msg = "First operator in swap is null. Cannot swap null operators.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ if(secondNode == null) {
+ int errCode = 1092;
+ String msg = "Second operator in swap is null. Cannot swap null operators.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ checkInPlan(firstNode);
+ checkInPlan(secondNode);
+
+ List<E> firstNodePredecessors = (ArrayList<E>)mToEdges.get(firstNode);
+
+ if(firstNodePredecessors != null && firstNodePredecessors.size() > 1) {
+ int errCode = 1093;
+ String msg = "Swap supports swap of operators with at most one input."
+ + " Found first operator with " + firstNodePredecessors.size() + " inputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ List<E> firstNodeSuccessors = (ArrayList<E>)mFromEdges.get(firstNode);
+
+ if(firstNodeSuccessors != null && firstNodeSuccessors.size() > 1) {
+ int errCode = 1093;
+ String msg = "Swap supports swap of operators with at most one output."
+ + " Found first operator with " + firstNodeSuccessors.size() + " outputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ List<E> secondNodePredecessors = (ArrayList<E>)mToEdges.get(secondNode);
+
+ if(secondNodePredecessors != null && secondNodePredecessors.size() > 1) {
+ int errCode = 1093;
+ String msg = "Swap supports swap of operators with at most one input."
+ + " Found second operator with " + secondNodePredecessors.size() + " inputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ List<E> secondNodeSuccessors = (ArrayList<E>)mFromEdges.get(secondNode);
+
+ if(secondNodeSuccessors != null && secondNodeSuccessors.size() > 1) {
+ int errCode = 1093;
+ String msg = "Swap supports swap of operators with at most one output."
+ + " Found second operator with " + secondNodeSuccessors.size() + " outputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ E firstNodePredecessor = null;
+ E firstNodeSuccessor = null;
+ E secondNodePredecessor = null;
+ E secondNodeSuccessor = null;
+
+ if(firstNodePredecessors != null) {
+ firstNodePredecessor = firstNodePredecessors.get(0);
+ }
+
+ if(firstNodeSuccessors != null) {
+ firstNodeSuccessor = firstNodeSuccessors.get(0);
+ }
+
+ if(secondNodePredecessors != null) {
+ secondNodePredecessor = secondNodePredecessors.get(0);
+ }
+
+ if(secondNodeSuccessors != null) {
+ secondNodeSuccessor = secondNodeSuccessors.get(0);
+ }
+
+ boolean immediateNodes = false;
+
+ if((firstNodeSuccessor == secondNode) && (secondNodePredecessor == firstNode)) {
+ immediateNodes = true;
+ } else if ((secondNodeSuccessor == firstNode) && (firstNodePredecessor == secondNode)) {
+ immediateNodes = true;
+ //swap the firstNode and secondNode
+ E tmpNode = firstNode;
+ firstNode = secondNode;
+ secondNode = tmpNode;
+
+ //swap the predecessor and successor nodes
+ tmpNode = firstNodePredecessor;
+ firstNodePredecessor = secondNodePredecessor;
+ secondNodePredecessor = tmpNode;
+
+ tmpNode = firstNodeSuccessor;
+ firstNodeSuccessor = secondNodeSuccessor;
+ secondNodeSuccessor = tmpNode;
+ }
+
+ if(immediateNodes) {
+ //Replace the predecessors and successors of first and second in their respective edge lists
+ replaceNode(firstNode, secondNodeSuccessor, firstNodeSuccessor, mFromEdges);
+ replaceNode(firstNode, secondNode, firstNodePredecessor, mToEdges);
+ replaceNode(secondNode, firstNode, secondNodeSuccessor, mFromEdges);
+ replaceNode(secondNode, firstNodePredecessor, secondNodePredecessor, mToEdges);
+ } else {
+ //Replace the predecessors and successors of first and second in their respective edge lists
+ replaceNode(firstNode, secondNodeSuccessor, firstNodeSuccessor, mFromEdges);
+ replaceNode(firstNode, secondNodePredecessor, firstNodePredecessor, mToEdges);
+ replaceNode(secondNode, firstNodeSuccessor, secondNodeSuccessor, mFromEdges);
+ replaceNode(secondNode, firstNodePredecessor, secondNodePredecessor, mToEdges);
+ }
+
+ //Replace first with second in the edges list for first's predecessor and successor
+ replaceNode(firstNodePredecessor, secondNode, firstNode, mFromEdges);
+ replaceNode(firstNodeSuccessor, secondNode, firstNode, mToEdges);
+
+ //Replace second with first in the edges list for second's predecessor and successor
+ replaceNode(secondNodePredecessor, firstNode, secondNode, mFromEdges);
+ replaceNode(secondNodeSuccessor, firstNode, secondNode, mToEdges);
+
+ markDirty();
+ }
+
+ /**
+ * Push one operator in front of another. This function is for use when
+ * the first operator has multiple inputs. The caller can specify
+ * which input of the first operator the second operator should be pushed to.
+ * @param first operator, assumed to have multiple inputs.
+ * @param second operator, will be pushed in front of first
+ * @param inputNum indicates which input of the first operator the second
+ * operator will be pushed onto. Numbered from 0.
+ * @throws PlanException if inputNum does not exist for first operator
+ */
+ public void pushBefore(E first, E second, int inputNum) throws PlanException {
+ E firstNode = first;
+ E secondNode = second;
+
+ if(firstNode == null) {
+ int errCode = 1085;
+ String msg = "First operator in pushBefore is null. Cannot pushBefore null operators.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ if(secondNode == null) {
+ int errCode = 1085;
+ String msg = "Second operator in pushBefore is null. Cannot pushBefore null operators.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ checkInPlan(firstNode);
+ checkInPlan(secondNode);
+
+ List<E> firstNodePredecessors = (ArrayList<E>)mToEdges.get(firstNode);
+
+ if(firstNodePredecessors == null || firstNodePredecessors.size() <= 1) {
+ int size = (firstNodePredecessors == null ? 0 : firstNodePredecessors.size());
+ int errCode = 1086;
+ String msg = "First operator in pushBefore should have multiple inputs."
+ + " Found first operator with " + size + " inputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ if(inputNum >= firstNodePredecessors.size()) {
+ int errCode = 1087;
+ String msg = "The inputNum " + inputNum + " should be lesser than the number of inputs of the first operator."
+ + " Found first operator with " + firstNodePredecessors.size() + " inputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ List<E> firstNodeSuccessors = (ArrayList<E>)mFromEdges.get(firstNode);
+
+ if(firstNodeSuccessors == null) {
+ int errCode = 1088;
+ String msg = "First operator in pushBefore should have at least one output."
+ + " Found first operator with no outputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ List<E> secondNodePredecessors = (ArrayList<E>)mToEdges.get(secondNode);
+
+ if(secondNodePredecessors == null || secondNodePredecessors.size() > 1) {
+ int size = (secondNodePredecessors == null ? 0 : secondNodePredecessors.size());
+ int errCode = 1088;
+ String msg = "Second operator in pushBefore should have one input."
+ + " Found second operator with " + size + " inputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ List<E> secondNodeSuccessors = (ArrayList<E>)mFromEdges.get(secondNode);
+
+ //check for multiple edges from first to second
+ int edgesFromFirstToSecond = 0;
+ for(E node: firstNodeSuccessors) {
+ if(node == secondNode) {
+ ++edgesFromFirstToSecond;
+ }
+ }
+
+ if(edgesFromFirstToSecond == 0) {
+ int errCode = 1089;
+ String msg = "Second operator in pushBefore should be the successor of the First operator.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ } else if (edgesFromFirstToSecond > 1) {
+ int errCode = 1090;
+ String msg = "Second operator can have at most one incoming edge from First operator."
+ + " Found " + edgesFromFirstToSecond + " edges.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ //check if E (i.e., firstNode) can support multiple outputs before we short-circuit
+
+ if(!firstNode.supportsMultipleOutputs()) {
+ int numSecondNodeSuccessors = (secondNodeSuccessors == null? 0 : secondNodeSuccessors.size());
+ if((firstNodeSuccessors.size() > 0) || (numSecondNodeSuccessors > 0)) {
+ int errCode = 1091;
+ String msg = "First operator does not support multiple outputs."
+ + " On completing the pushBefore operation First operator will end up with "
+ + (firstNodeSuccessors.size() + numSecondNodeSuccessors) + " edges.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+ }
+
+ //Assume that we have a graph which is like
+ // A B C D
+ // \ | | /
+ // E
+ // / | \
+ // F G H
+ // / | \
+ // I J K
+ //
+ //Now pushBefore(E, G, 1)
+ //This can be done using the following sequence of transformations
+ //1. Promote G's successors as E's successors using reconnectSuccessors(G)
+ //2. Insert G between B and E using insertBetween(B, G, E)
+ //The graphs after each step
+ //Step 1 - Note that G is standing alone
+ // A B C D G
+ // \ | | /
+ // E
+ // / / | \ \
+ // F I J K H
+ //Step 2
+ // B
+ // |
+ // A G C D
+ // \ | | /
+ // E
+ // / / | \ \
+ // F I J K H
+
+ reconnectSuccessors(secondNode, false, false);
+ insertBetween(firstNodePredecessors.get(inputNum), secondNode, firstNode);
+
+ markDirty();
+ return;
+ }
+
+ /**
+ * Push one operator after another. This function is for use when the second
+ * operator has multiple outputs. The caller can specify which output of the
+ * second operator the first operator should be pushed to.
+ * @param first operator, assumed to have multiple outputs
+ * @param second operator, will be pushed after the first operator
+ * @param outputNum indicates which output of the first operator the second
+ * operator will be pushed onto. Numbered from 0.
+ * @throws PlanException if outputNum does not exist for first operator
+ */
+ public void pushAfter(E first, E second, int outputNum) throws PlanException {
+ E firstNode = first;
+ E secondNode = second;
+
+ if(firstNode == null) {
+ int errCode = 1085;
+ String msg = "First operator in pushAfter is null. Cannot pushBefore null operators.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ if(secondNode == null) {
+ int errCode = 1085;
+ String msg = "Second operator in pushAfter is null. Cannot pushBefore null operators.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ checkInPlan(firstNode);
+ checkInPlan(secondNode);
+
+ List<E> firstNodePredecessors = (ArrayList<E>)mToEdges.get(firstNode);
+
+ if(firstNodePredecessors == null) {
+ int errCode = 1088;
+ String msg = "First operator in pushAfter should have at least one input."
+ + " Found first operator with no inputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ List<E> firstNodeSuccessors = (ArrayList<E>)mFromEdges.get(firstNode);
+
+ if(firstNodeSuccessors == null || firstNodeSuccessors.size() <= 1) {
+ int size = (firstNodeSuccessors == null ? 0 : firstNodeSuccessors.size());
+ int errCode = 1086;
+ String msg = "First operator in pushAfter should have multiple outputs."
+ + " Found first operator with " + size + " outputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ if(outputNum >= firstNodeSuccessors.size()) {
+ int errCode = 1087;
+ String msg = "The outputNum " + outputNum + " should be lesser than the number of outputs of the first operator."
+ + " Found first operator with " + firstNodeSuccessors.size() + " outputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ List<E> secondNodePredecessors = (ArrayList<E>)mToEdges.get(secondNode);
+
+ List<E> secondNodeSuccessors = (ArrayList<E>)mFromEdges.get(secondNode);
+
+ if(secondNodeSuccessors == null || secondNodeSuccessors.size() > 1) {
+ int size = (secondNodeSuccessors == null ? 0 : secondNodeSuccessors.size());
+ int errCode = 1088;
+ String msg = "Second operator in pushAfter should have one output."
+ + " Found second operator with " + size + " outputs.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+
+ //check for multiple edges from second to first
+ int edgesFromSecondToFirst = 0;
+ for(E node: secondNodeSuccessors) {
+ if(node == firstNode) {
+ ++edgesFromSecondToFirst;
+ }
+ }
+
+ if(edgesFromSecondToFirst == 0) {
+ int errCode = 1089;
+ String msg = "Second operator in pushAfter should be the predecessor of the First operator.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ } else if (edgesFromSecondToFirst > 1) {
+ int errCode = 1090;
+ String msg = "Second operator can have at most one outgoing edge from First operator."
+ + " Found " + edgesFromSecondToFirst + " edges.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+
+ //check if E (i.e., firstNode) can support multiple outputs before we short-circuit
+
+ if(!firstNode.supportsMultipleInputs()) {
+ int numSecondNodePredecessors = (secondNodePredecessors == null? 0 : secondNodePredecessors.size());
+ if((firstNodePredecessors.size() > 0) || (numSecondNodePredecessors > 0)) {
+ int errCode = 1091;
+ String msg = "First operator does not support multiple inputs."
+ + " On completing the pushAfter operation First operator will end up with "
+ + (firstNodePredecessors.size() + numSecondNodePredecessors) + " edges.";
+ throw new PlanException(msg, errCode, PigException.INPUT);
+ }
+ }
+
+ //Assume that we have a graph which is like
+ // A B C D
+ // \ | | /
+ // E
+ // |
+ // G
+ // / | \
+ // I J K
+ //
+ //Now pushAfter(G, E, 1)
+ //This can be done using the following sequence of transformations
+ //1. Promote E's predecessors as G's predecessors using reconnectPredecessors(E)
+ //2. Insert E between G and J using insertBetween(G, E, J)
+ //The graphs after each step
+ //Step 1 - Note that E is standing alone
+ // A B C D E
+ // \ | | /
+ // G
+ // / | \
+ // I J K
+ //Step 2
+ // A B C D
+ // \ | | /
+ // G
+ // / | \
+ // I E K
+ // |
+ // J
+
+ reconnectPredecessors(secondNode, false, false);
+ insertBetween(firstNode, secondNode, firstNodeSuccessors.get(outputNum));
+
+ markDirty();
+ return;
+
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java?rev=766907&r1=766906&r2=766907&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestBestFitCast.java Mon Apr 20 23:03:10 2009
@@ -43,9 +43,6 @@
import org.junit.Before;
import org.junit.Test;
-import com.sun.org.apache.bcel.internal.ExceptionConstants;
-import com.sun.org.apache.xerces.internal.impl.xpath.regex.ParseException;
-
import junit.framework.TestCase;
public class TestBestFitCast extends TestCase {
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java?rev=766907&r1=766906&r2=766907&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestOperatorPlan.java Mon Apr 20 23:03:10 2009
@@ -78,7 +78,7 @@
@Override
public void visit(PlanVisitor v) throws VisitorException {
- ((TVisitor)v).visitSingleOperator(this);
+ ((TVisitor)v).visit(this);
}
public String name() {
@@ -102,7 +102,7 @@
}
public void visit(PlanVisitor v) throws VisitorException {
- ((TVisitor)v).visitMultiOperator(this);
+ ((TVisitor)v).visit(this);
}
public String name() {
@@ -111,6 +111,56 @@
}
}
+
+ class MultiInputSingleOutputOperator extends TOperator {
+ MultiInputSingleOutputOperator(String name) {
+ super(name);
+ }
+
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public void visit(PlanVisitor v) throws VisitorException {
+ ((TVisitor)v).visit(this);
+ }
+
+ public String name() {
+ //return this.getClass().getName() + " " + mName
+ return mName;
+ }
+
+ }
+
+ class MultiOutputSingleInputOperator extends TOperator {
+ MultiOutputSingleInputOperator(String name) {
+ super(name);
+ }
+
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ public boolean supportsMultipleOutputs() {
+ return true;
+ }
+
+ @Override
+ public void visit(PlanVisitor v) throws VisitorException {
+ ((TVisitor)v).visit(this);
+ }
+
+ public String name() {
+ //return this.getClass().getName() + " " + mName
+ return mName;
+ }
+
+ }
class TPlan extends OperatorPlan<TOperator> {
@@ -166,15 +216,25 @@
mJournal = new StringBuilder();
}
- public void visitSingleOperator(SingleOperator so) throws VisitorException {
+ public void visit(SingleOperator so) throws VisitorException {
mJournal.append(so.name());
mJournal.append(' ');
}
- public void visitMultiOperator(MultiOperator mo) throws VisitorException {
+ public void visit(MultiOperator mo) throws VisitorException {
mJournal.append(mo.name());
mJournal.append(' ');
}
+
+ public void visit(MultiInputSingleOutputOperator miso) throws VisitorException {
+ mJournal.append(miso.name());
+ mJournal.append(' ');
+ }
+
+ public void visit(MultiOutputSingleInputOperator mosi) throws VisitorException {
+ mJournal.append(mosi.name());
+ mJournal.append(' ');
+ }
public String getJournal() {
return mJournal.toString();
@@ -1760,5 +1820,1125 @@
assertFalse(neverTransform.mTransformed);
assertTrue(neverTransform.getNumberOfChecks() == 0);
}
+
+ //Swap two roots in a graph. Both the roots are disconnected
+ //and are the only nodes in the graph
+ @Test
+ public void testSwapRootsInDisconnectedGraph() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[2];
+
+ for(int i = 0; i < ops.length; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i));
+ plan.add(ops[i]);
+ }
+
+ plan.swap(ops[0], ops[1]);
+
+ List<TOperator> roots = (ArrayList<TOperator>)plan.getRoots();
+ for(int i = 0; i < roots.size(); ++i) {
+ assertEquals(roots.get(i), ops[i]);
+ }
+ }
+
+ //Swap two nodes in a graph.
+ //Input
+ //S1->S2
+ //Ouput
+ //S2->S1
+ //Swap again
+ //Output
+ //S1->S2
+ @Test
+ public void testSimpleSwap() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[2];
+
+ for(int i = 0; i < ops.length; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i));
+ plan.add(ops[i]);
+ }
+
+ plan.connect(ops[0], ops[1]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ plan.swap(ops[0], ops[1]);
+
+ //planPrinter.visit();
+
+ List<TOperator> roots = (ArrayList<TOperator>)plan.getRoots();
+ assertEquals(roots.get(0), ops[1]);
+
+ List<TOperator> rootSuccessors = plan.getSuccessors(roots.get(0));
+ assertEquals(rootSuccessors.get(0), ops[0]);
+
+ List<TOperator> leaves = (ArrayList<TOperator>)plan.getLeaves();
+ assertEquals(leaves.get(0), ops[0]);
+
+ List<TOperator> leafPredecessors = plan.getPredecessors(leaves.get(0));
+ assertEquals(leafPredecessors.get(0), ops[1]);
+
+ plan.swap(ops[0], ops[1]);
+
+ //planPrinter.visit();
+
+ roots = (ArrayList<TOperator>)plan.getRoots();
+ assertEquals(roots.get(0), ops[0]);
+
+ rootSuccessors = plan.getSuccessors(roots.get(0));
+ assertEquals(rootSuccessors.get(0), ops[1]);
+
+ leaves = (ArrayList<TOperator>)plan.getLeaves();
+ assertEquals(leaves.get(0), ops[1]);
+
+ leafPredecessors = plan.getPredecessors(leaves.get(0));
+ assertEquals(leafPredecessors.get(0), ops[0]);
+ }
+
+ //Swap two nodes in a graph.
+ //Swap S1 and S3
+ //Input
+ //S1->S2->S3
+ //Intermediate Output
+ //S3->S2->S1
+ //Output
+ //S1->S2->S3
+ @Test
+ public void testSimpleSwap2() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[3];
+
+ for(int i = 0; i < ops.length; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i));
+ plan.add(ops[i]);
+ }
+
+ plan.connect(ops[0], ops[1]);
+ plan.connect(ops[1], ops[2]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ plan.swap(ops[0], ops[2]);
+
+ //planPrinter.visit();
+
+ List<TOperator> roots = (ArrayList<TOperator>)plan.getRoots();
+ assertEquals(roots.get(0), ops[2]);
+
+ List<TOperator> rootSuccessors = plan.getSuccessors(roots.get(0));
+ assertEquals(rootSuccessors.get(0), ops[1]);
+
+ List<TOperator> leaves = (ArrayList<TOperator>)plan.getLeaves();
+ assertEquals(leaves.get(0), ops[0]);
+
+ List<TOperator> leafPredecessors = plan.getPredecessors(leaves.get(0));
+ assertEquals(leafPredecessors.get(0), ops[1]);
+
+ plan.swap(ops[0], ops[2]);
+
+ //planPrinter.visit();
+
+ roots = (ArrayList<TOperator>)plan.getRoots();
+ assertEquals(roots.get(0), ops[0]);
+
+ rootSuccessors = plan.getSuccessors(roots.get(0));
+ assertEquals(rootSuccessors.get(0), ops[1]);
+
+ leaves = (ArrayList<TOperator>)plan.getLeaves();
+ assertEquals(leaves.get(0), ops[2]);
+
+ leafPredecessors = plan.getPredecessors(leaves.get(0));
+ assertEquals(leafPredecessors.get(0), ops[1]);
+ }
+
+ //Swap two nodes in a graph and then swap it back again.
+ //Swap S2 and S3
+ //Input
+ //S1->S2->S3
+ //Intermediate Output
+ //S1->S3->S2
+ //Output
+ //S1->S2->S3
+ @Test
+ public void testSimpleSwap3() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[3];
+
+ for(int i = 0; i < ops.length; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i));
+ plan.add(ops[i]);
+ }
+
+ plan.connect(ops[0], ops[1]);
+ plan.connect(ops[1], ops[2]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ plan.swap(ops[1], ops[2]);
+
+ //planPrinter.visit();
+
+ List<TOperator> roots = (ArrayList<TOperator>)plan.getRoots();
+ assertEquals(roots.get(0), ops[0]);
+
+ List<TOperator> rootSuccessors = plan.getSuccessors(roots.get(0));
+ assertEquals(rootSuccessors.get(0), ops[2]);
+
+ List<TOperator> leaves = (ArrayList<TOperator>)plan.getLeaves();
+ assertEquals(leaves.get(0), ops[1]);
+
+ List<TOperator> leafPredecessors = plan.getPredecessors(leaves.get(0));
+ assertEquals(leafPredecessors.get(0), ops[2]);
+
+ plan.swap(ops[1], ops[2]);
+
+ //planPrinter.visit();
+
+ roots = (ArrayList<TOperator>)plan.getRoots();
+ assertEquals(roots.get(0), ops[0]);
+
+ rootSuccessors = plan.getSuccessors(roots.get(0));
+ assertEquals(rootSuccessors.get(0), ops[1]);
+
+ leaves = (ArrayList<TOperator>)plan.getLeaves();
+ assertEquals(leaves.get(0), ops[2]);
+
+ leafPredecessors = plan.getPredecessors(leaves.get(0));
+ assertEquals(leafPredecessors.get(0), ops[1]);
+ }
+
+ //Swap two nodes in a graph and then swap it back again.
+ //Swap S1 and S2
+ //Input
+ //S1->S2->S3
+ //Intermediate Output
+ //S2->S1->S3
+ //Output
+ //S1->S2->S3
+ @Test
+ public void testSimpleSwap4() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[3];
+
+ for(int i = 0; i < ops.length; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i));
+ plan.add(ops[i]);
+ }
+
+ plan.connect(ops[0], ops[1]);
+ plan.connect(ops[1], ops[2]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ plan.swap(ops[0], ops[1]);
+
+ //planPrinter.visit();
+
+ List<TOperator> roots = (ArrayList<TOperator>)plan.getRoots();
+ assertEquals(roots.get(0), ops[1]);
+
+ List<TOperator> rootSuccessors = plan.getSuccessors(roots.get(0));
+ assertEquals(rootSuccessors.get(0), ops[0]);
+
+ List<TOperator> leaves = (ArrayList<TOperator>)plan.getLeaves();
+ assertEquals(leaves.get(0), ops[2]);
+
+ List<TOperator> leafPredecessors = plan.getPredecessors(leaves.get(0));
+ assertEquals(leafPredecessors.get(0), ops[0]);
+
+ plan.swap(ops[0], ops[1]);
+
+ //planPrinter.visit();
+
+ roots = (ArrayList<TOperator>)plan.getRoots();
+ assertEquals(roots.get(0), ops[0]);
+
+ rootSuccessors = plan.getSuccessors(roots.get(0));
+ assertEquals(rootSuccessors.get(0), ops[1]);
+
+ leaves = (ArrayList<TOperator>)plan.getLeaves();
+ assertEquals(leaves.get(0), ops[2]);
+
+ leafPredecessors = plan.getPredecessors(leaves.get(0));
+ assertEquals(leafPredecessors.get(0), ops[1]);
+ }
+
+ //Swap non-existent nodes in a graph and check for exceptions
+ //Swap S1 and S4
+ //Swap S4 and S1
+ //Swap S5 and S4
+ //Swap S1 and null
+ //Swap null and S1
+ //Swap null and null
+ //Input
+ //S1->S2->S3 S4 S5
+ @Test
+ public void testNegativeSimpleSwap() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[5];
+
+ for(int i = 0; i < ops.length; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i));
+ }
+
+ for(int i = 0; i < ops.length - 2; ++i) {
+ plan.add(ops[i]);
+ }
+
+
+ plan.connect(ops[0], ops[1]);
+ plan.connect(ops[1], ops[2]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ try {
+ plan.swap(ops[0], ops[3]);
+ fail("Expected exception for node not in plan.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getMessage().contains("not in the plan"));
+ }
+
+ try {
+ plan.swap(ops[3], ops[0]);
+ fail("Expected exception for node not in plan.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getMessage().contains("not in the plan"));
+ }
+
+ try {
+ plan.swap(ops[4], ops[3]);
+ fail("Expected exception for node not in plan.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getMessage().contains("not in the plan"));
+ }
+
+ try {
+ plan.swap(ops[0], null);
+ fail("Expected exception for having null as one of the inputs");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1092);
+ }
+
+ try {
+ plan.swap(null, ops[0]);
+ fail("Expected exception for having null as one of the inputs");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1092);
+ }
+
+ try {
+ plan.swap(null, null);
+ fail("Expected exception for having null as one of the inputs");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1092);
+ }
+
+ }
+
+ //Swap nodes that have multiple inputs and multiple outs in a graph and check for exceptions
+ //Input
+ // S1 S2
+ // \ /
+ // M1
+ // |
+ // M2
+ // / \
+ // S3 S4
+ //Swap S1 and M1
+ //Swap M1 and S1
+ //Swap M1 and M2
+ //Swap M2 and M1
+ //Swap M2 and S3
+ //Swap S3 and M2
+ @Test
+ public void testNegativeSimpleSwap1() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[6];
+ ops[0] = new SingleOperator("1");
+ plan.add(ops[0]);
+ ops[1] = new SingleOperator("2");
+ plan.add(ops[1]);
+ ops[2] = new MultiOperator("3");
+ plan.add(ops[2]);
+ plan.connect(ops[0], ops[2]);
+ plan.connect(ops[1], ops[2]);
+
+ ops[3] = new SingleOperator("4");
+ plan.add(ops[3]);
+ ops[4] = new SingleOperator("5");
+ plan.add(ops[4]);
+ ops[5] = new MultiOperator("6");
+ plan.add(ops[5]);
+ plan.connect(ops[5], ops[3]);
+ plan.connect(ops[5], ops[4]);
+
+ plan.connect(ops[2], ops[5]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ try {
+ plan.swap(ops[0], ops[2]);
+ fail("Expected exception for multi-input operator.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1093);
+ }
+
+ try {
+ plan.swap(ops[2], ops[0]);
+ fail("Expected exception for multi-input operator.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1093);
+ }
+
+ try {
+ plan.swap(ops[2], ops[5]);
+ fail("Expected exception for multi-input operator.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1093);
+ }
+
+ try {
+ plan.swap(ops[5], ops[2]);
+ fail("Expected exception for multi-output operator.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1093);
+ }
+
+ try {
+ plan.swap(ops[5], ops[3]);
+ fail("Expected exception for multi-output operator.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1093);
+ }
+
+ try {
+ plan.swap(ops[3], ops[5]);
+ fail("Expected exception for multi-output operator.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1093);
+ }
+
+ }
+
+ //Push M11 before M10's inputs - 0 through 3
+ //Input
+ // S1 S2 S3 S4
+ // \ | | /
+ // M10
+ // / | \
+ // S5 M11 S6
+ // / | \
+ // S7 S8 S9
+ //Output when pushed before 1st input
+ // S2
+ // |
+ // S1 M11 S3 S4
+ // \ | | /
+ // M10
+ // / / | \ \
+ // S5 S7 S8 S9 S6
+ @Test
+ public void testpushBefore() throws Exception {
+ for(int index = 0; index < 4; index++) {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[11];
+
+ for(int i = 0; i < ops.length - 2; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i+1));
+ plan.add(ops[i]);
+ }
+
+ ops[9] = new MultiOperator("10");
+ plan.add(ops[9]);
+
+ ops[10] = new MultiOperator("11");
+ plan.add(ops[10]);
+
+ plan.connect(ops[0], ops[9]);
+ plan.connect(ops[1], ops[9]);
+ plan.connect(ops[2], ops[9]);
+ plan.connect(ops[3], ops[9]);
+ plan.connect(ops[9], ops[4]);
+ plan.connect(ops[9], ops[10]);
+ plan.connect(ops[9], ops[5]);
+
+ plan.connect(ops[10], ops[6]);
+ plan.connect(ops[10], ops[7]);
+ plan.connect(ops[10], ops[8]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ plan.pushBefore(ops[9], ops[10], index) ;
+
+ //planPrinter.visit();
+
+ Set<TOperator> rootSet = new HashSet<TOperator>();
+ rootSet.add(ops[0]);
+ rootSet.add(ops[1]);
+ rootSet.add(ops[2]);
+ rootSet.add(ops[3]);
+
+ Set<TOperator> expectedRootSet = new HashSet<TOperator>(plan.getRoots());
+
+ rootSet.retainAll(expectedRootSet);
+ assertTrue(rootSet.size() == 4);
+
+ Set<TOperator> leafSet = new HashSet<TOperator>();
+ leafSet.add(ops[4]);
+ leafSet.add(ops[5]);
+ leafSet.add(ops[6]);
+ leafSet.add(ops[7]);
+ leafSet.add(ops[8]);
+
+ Set<TOperator> expectedLeafSet = new HashSet<TOperator>(plan.getLeaves());
+
+ leafSet.retainAll(expectedLeafSet);
+ assertTrue(leafSet.size() == 5);
+
+ List<TOperator> m10Predecessors = plan.getPredecessors(ops[9]);
+ assertTrue(m10Predecessors.get(index) == ops[10]);
+
+ List<TOperator> m11Predecessors = plan.getPredecessors(ops[10]);
+ assertTrue(m11Predecessors.get(0) == ops[index]);
+ }
+ }
+
+ //Push S5 and S6 before M10's input
+ //Input
+ // S1 S2 S3 S4
+ // \ | | /
+ // M10
+ // / | \
+ // S5 M11 S6
+ // / | \
+ // S7 S8 S9
+ //Output when pushed before 1st input
+ // S2
+ // |
+ // S1 S5 S3 S4
+ // \ | | /
+ // M10
+ // / \
+ // M11 S6
+ // / | \
+ // S7 S8 S9
+ @Test
+ public void testpushBefore2() throws Exception {
+ for(int outerIndex = 0; outerIndex < 2; outerIndex++) {
+ for(int index = 0; index < 2; index++) {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[11];
+
+ for(int i = 0; i < ops.length - 2; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i+1));
+ plan.add(ops[i]);
+ }
+
+ ops[9] = new MultiOperator("10");
+ plan.add(ops[9]);
+
+ ops[10] = new MultiOperator("11");
+ plan.add(ops[10]);
+
+ plan.connect(ops[0], ops[9]);
+ plan.connect(ops[1], ops[9]);
+ plan.connect(ops[2], ops[9]);
+ plan.connect(ops[3], ops[9]);
+ plan.connect(ops[9], ops[4]);
+ plan.connect(ops[9], ops[10]);
+ plan.connect(ops[9], ops[5]);
+
+ plan.connect(ops[10], ops[6]);
+ plan.connect(ops[10], ops[7]);
+ plan.connect(ops[10], ops[8]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ int secondNodeIndex = outerIndex + 4;
+ plan.pushBefore(ops[9], ops[secondNodeIndex], index) ;
+
+ //planPrinter.visit();
+
+ Set<TOperator> rootSet = new HashSet<TOperator>();
+ rootSet.add(ops[0]);
+ rootSet.add(ops[1]);
+ rootSet.add(ops[2]);
+ rootSet.add(ops[3]);
+
+ Set<TOperator> expectedRootSet = new HashSet<TOperator>(plan.getRoots());
+
+ rootSet.retainAll(expectedRootSet);
+ assertTrue(rootSet.size() == 4);
+
+ Set<TOperator> leafSet = new HashSet<TOperator>();
+ for(int leafIndex = 4; leafIndex < 9; ++leafIndex) {
+ if(leafIndex != secondNodeIndex) {
+ leafSet.add(ops[leafIndex]);
+ }
+ }
+
+ Set<TOperator> expectedLeafSet = new HashSet<TOperator>(plan.getLeaves());
+
+ leafSet.retainAll(expectedLeafSet);
+ assertTrue(leafSet.size() == 4);
+
+ List<TOperator> outerIndexNodePredecessors = plan.getPredecessors(ops[secondNodeIndex]);
+ assertTrue(outerIndexNodePredecessors.get(0) == ops[index]);
+
+ List<TOperator> m10Predecessors = plan.getPredecessors(ops[9]);
+ assertTrue(m10Predecessors.get(index) == ops[secondNodeIndex]);
+ }
+ }
+ }
+
+ //Push non-existent nodes in a graph and check for exceptions
+ //Push S1 after S4
+ //Push S4 after S1
+ //Push S5 after S4
+ //Push S1 after null
+ //Push null after S1
+ //Push null after null
+ //Input
+ //S1->S2->S3 S4 S5
+ @Test
+ public void testNegativePushBefore() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[5];
+
+ for(int i = 0; i < ops.length; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i));
+ }
+
+ for(int i = 0; i < ops.length - 2; ++i) {
+ plan.add(ops[i]);
+ }
+
+
+ plan.connect(ops[0], ops[1]);
+ plan.connect(ops[1], ops[2]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ try {
+ plan.pushBefore(ops[0], ops[3], 0);
+ fail("Expected exception for node not in plan.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getMessage().contains("not in the plan"));
+ }
+
+ try {
+ plan.pushBefore(ops[3], ops[0], 0);
+ fail("Expected exception for node not in plan.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getMessage().contains("not in the plan"));
+ }
+
+ try {
+ plan.pushBefore(ops[4], ops[3], 0);
+ fail("Expected exception for node not in plan.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getMessage().contains("not in the plan"));
+ }
+
+ try {
+ plan.pushBefore(ops[0], null, 0);
+ fail("Expected exception for having null as one of the inputs");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1085);
+ }
+
+ try {
+ plan.pushBefore(null, ops[0], 0);
+ fail("Expected exception for having null as one of the inputs");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1085);
+ }
+
+ try {
+ plan.pushBefore(null, null, 0);
+ fail("Expected exception for having null as one of the inputs");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1085);
+ }
+
+ }
+
+ //Negative test cases
+ //Input
+ // S1 S2 S3 S4
+ // \ | | /
+ // M10
+ // / | \
+ // S5 M11 S6
+ // / | \
+ // S7 S8 S9
+ @Test
+ public void testNegativePushBefore2() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[11];
+
+ for(int i = 0; i < ops.length - 2; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i+1));
+ plan.add(ops[i]);
+ }
+
+ ops[9] = new MultiOperator("10");
+ plan.add(ops[9]);
+
+ ops[10] = new MultiOperator("11");
+ plan.add(ops[10]);
+
+ plan.connect(ops[0], ops[9]);
+ plan.connect(ops[1], ops[9]);
+ plan.connect(ops[2], ops[9]);
+ plan.connect(ops[3], ops[9]);
+ plan.connect(ops[9], ops[4]);
+ plan.connect(ops[9], ops[10]);
+ plan.connect(ops[9], ops[5]);
+
+ plan.connect(ops[10], ops[6]);
+ plan.connect(ops[10], ops[7]);
+ plan.connect(ops[10], ops[8]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ try {
+ plan.pushBefore(ops[0], ops[9], 0) ;
+ fail("Expected exception for first operator having null predecessors");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1086);
+ }
+
+ try {
+ plan.pushBefore(ops[10], ops[6], 0) ;
+ fail("Expected exception for first operator having one predecessor");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1086);
+ }
+
+ try {
+ plan.pushBefore(ops[9], ops[10], 4) ;
+ fail("Expected exception for inputNum exceeding number of predecessors");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1087);
+ }
+
+ try {
+ plan.pushBefore(ops[9], ops[0], 0) ;
+ fail("Expected exception for second operator having null predecessors");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1088);
+ }
+
+ try {
+ plan.pushBefore(ops[9], ops[9], 0) ;
+ fail("Expected exception for second operator having more than one predecessor");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1088);
+ }
+
+ try {
+ plan.pushBefore(ops[9], ops[8], 0) ;
+ fail("Expected exception for second operator not being a successor of first operator");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1089);
+ }
+
+ plan.disconnect(ops[9], ops[4]);
+ plan.disconnect(ops[9], ops[5]);
+
+ MultiInputSingleOutputOperator miso = new MultiInputSingleOutputOperator("12");
+
+ plan.replace(ops[9], miso);
+
+ try {
+ plan.pushBefore(miso, ops[10], 0) ;
+ fail("Expected exception for trying to connect multiple outputs to the first operator");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1091);
+ }
+
+ }
+
+ //Push M10 after M11's outputs - 0 through 2
+ //Input
+ // S1 S2 S3 S4
+ // \ | | /
+ // M10
+ // S5 | S6
+ // \ | /
+ // M11
+ // / | \
+ // S7 S8 S9
+ //Output when pushed after 1st output
+ // S5 S1 S2 S3 S4 S6
+ // \ \ \ / / /
+ // M10
+ // / | \
+ // S7 M11 S9
+ // |
+ // S8
+
+ @Test
+ public void testpushAfter() throws Exception {
+ for(int index = 0; index < 3; index++) {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[11];
+
+ for(int i = 0; i < ops.length - 2; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i+1));
+ plan.add(ops[i]);
+ }
+
+ ops[9] = new MultiOperator("10");
+ plan.add(ops[9]);
+
+ ops[10] = new MultiOperator("11");
+ plan.add(ops[10]);
+
+ plan.connect(ops[0], ops[9]);
+ plan.connect(ops[1], ops[9]);
+ plan.connect(ops[2], ops[9]);
+ plan.connect(ops[3], ops[9]);
+ plan.connect(ops[9], ops[10]);
+ plan.connect(ops[4], ops[10]);
+ plan.connect(ops[5], ops[10]);
+
+ plan.connect(ops[10], ops[6]);
+ plan.connect(ops[10], ops[7]);
+ plan.connect(ops[10], ops[8]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ plan.pushAfter(ops[10], ops[9], index) ;
+
+ //planPrinter.visit();
+
+ Set<TOperator> rootSet = new HashSet<TOperator>();
+ rootSet.add(ops[0]);
+ rootSet.add(ops[1]);
+ rootSet.add(ops[2]);
+ rootSet.add(ops[3]);
+ rootSet.add(ops[4]);
+ rootSet.add(ops[5]);
+
+ Set<TOperator> expectedRootSet = new HashSet<TOperator>(plan.getRoots());
+
+ rootSet.retainAll(expectedRootSet);
+ assertTrue(rootSet.size() == 6);
+
+ Set<TOperator> leafSet = new HashSet<TOperator>();
+ leafSet.add(ops[6]);
+ leafSet.add(ops[7]);
+ leafSet.add(ops[8]);
+
+ Set<TOperator> expectedLeafSet = new HashSet<TOperator>(plan.getLeaves());
+
+ leafSet.retainAll(expectedLeafSet);
+ assertTrue(leafSet.size() == 3);
+
+ List<TOperator> m10Successors = plan.getSuccessors(ops[9]);
+ assertTrue(m10Successors.get(0) == ops[index + 6]);
+
+ List<TOperator> m11Successors = plan.getSuccessors(ops[10]);
+ assertTrue(m11Successors.get(index) == ops[9]);
+ }
+ }
+
+ //Push S5 and S6 after M11's outputs - 0 through 2
+ //Input
+ // S1 S2 S3 S4
+ // \ | | /
+ // M10
+ // S5 | S6
+ // \ | /
+ // M11
+ // / | \
+ // S7 S8 S9
+ //Output when S5 is pushed after 1st output
+ // S1 S2 S3 S4
+ // \ | | /
+ // M10
+ // | S6
+ // | /
+ // M11
+ // / | \
+ // S7 S5 S9
+ // |
+ // S8
+
+ @Test
+ public void testpushAfter1() throws Exception {
+ for(int outerIndex = 0; outerIndex < 2; outerIndex++) {
+ for(int index = 0; index < 3; index++) {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[11];
+
+ for(int i = 0; i < ops.length - 2; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i+1));
+ plan.add(ops[i]);
+ }
+
+ ops[9] = new MultiOperator("10");
+ plan.add(ops[9]);
+
+ ops[10] = new MultiOperator("11");
+ plan.add(ops[10]);
+
+ plan.connect(ops[0], ops[9]);
+ plan.connect(ops[1], ops[9]);
+ plan.connect(ops[2], ops[9]);
+ plan.connect(ops[3], ops[9]);
+ plan.connect(ops[9], ops[10]);
+ plan.connect(ops[4], ops[10]);
+ plan.connect(ops[5], ops[10]);
+
+ plan.connect(ops[10], ops[6]);
+ plan.connect(ops[10], ops[7]);
+ plan.connect(ops[10], ops[8]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ int secondNodeIndex = outerIndex + 4;
+ plan.pushAfter(ops[10], ops[secondNodeIndex], index) ;
+
+ //planPrinter.visit();
+
+ Set<TOperator> rootSet = new HashSet<TOperator>();
+ rootSet.add(ops[0]);
+ rootSet.add(ops[1]);
+ rootSet.add(ops[2]);
+ rootSet.add(ops[3]);
+
+ for(int rootIndex = 0; rootIndex < 6; ++rootIndex) {
+ if(rootIndex != secondNodeIndex) {
+ rootSet.add(ops[rootIndex]);
+ }
+ }
+
+ Set<TOperator> expectedRootSet = new HashSet<TOperator>(plan.getRoots());
+
+ rootSet.retainAll(expectedRootSet);
+ assertTrue(rootSet.size() == 5);
+
+ Set<TOperator> leafSet = new HashSet<TOperator>();
+ leafSet.add(ops[6]);
+ leafSet.add(ops[7]);
+ leafSet.add(ops[8]);
+
+ Set<TOperator> expectedLeafSet = new HashSet<TOperator>(plan.getLeaves());
+
+ leafSet.retainAll(expectedLeafSet);
+ assertTrue(leafSet.size() == 3);
+
+ List<TOperator> outerIndexNodeSuccessors = plan.getSuccessors(ops[secondNodeIndex]);
+ assertTrue(outerIndexNodeSuccessors.get(0) == ops[index + 6]);
+
+ List<TOperator> m11Successors = plan.getSuccessors(ops[10]);
+ assertTrue(m11Successors.get(index) == ops[secondNodeIndex]);
+ }
+ }
+ }
+
+ //Push non-existent nodes in a graph and check for exceptions
+ //Push S1 after S4
+ //Push S4 after S1
+ //Push S5 after S4
+ //Push S1 after null
+ //Push null after S1
+ //Push null after null
+ //Input
+ //S1->S2->S3 S4 S5
+ @Test
+ public void testNegativePushAfter() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[5];
+
+ for(int i = 0; i < ops.length; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i));
+ }
+
+ for(int i = 0; i < ops.length - 2; ++i) {
+ plan.add(ops[i]);
+ }
+
+
+ plan.connect(ops[0], ops[1]);
+ plan.connect(ops[1], ops[2]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ try {
+ plan.pushAfter(ops[0], ops[3], 0);
+ fail("Expected exception for node not in plan.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getMessage().contains("not in the plan"));
+ }
+
+ try {
+ plan.pushAfter(ops[3], ops[0], 0);
+ fail("Expected exception for node not in plan.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getMessage().contains("not in the plan"));
+ }
+
+ try {
+ plan.pushAfter(ops[4], ops[3], 0);
+ fail("Expected exception for node not in plan.");
+ } catch (PlanException pe) {
+ assertTrue(pe.getMessage().contains("not in the plan"));
+ }
+
+ try {
+ plan.pushAfter(ops[0], null, 0);
+ fail("Expected exception for having null as one of the inputs");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1085);
+ }
+
+ try {
+ plan.pushAfter(null, ops[0], 0);
+ fail("Expected exception for having null as one of the inputs");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1085);
+ }
+
+ try {
+ plan.pushAfter(null, null, 0);
+ fail("Expected exception for having null as one of the inputs");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1085);
+ }
+
+ }
+
+ //Negative test cases
+ //Input
+ // S1 S2 S3 S4
+ // \ | | /
+ // M10
+ // S5 | S6
+ // \ | /
+ // M11
+ // / | \
+ // S7 S8 S9
+ @Test
+ public void testNegativePushAfter2() throws Exception {
+ TPlan plan = new TPlan();
+ TOperator[] ops = new TOperator[11];
+
+ for(int i = 0; i < ops.length - 2; ++i) {
+ ops[i] = new SingleOperator(Integer.toString(i+1));
+ plan.add(ops[i]);
+ }
+
+ ops[9] = new MultiOperator("10");
+ plan.add(ops[9]);
+
+ ops[10] = new MultiOperator("11");
+ plan.add(ops[10]);
+
+ plan.connect(ops[0], ops[9]);
+ plan.connect(ops[1], ops[9]);
+ plan.connect(ops[2], ops[9]);
+ plan.connect(ops[3], ops[9]);
+ plan.connect(ops[9], ops[10]);
+ plan.connect(ops[4], ops[10]);
+ plan.connect(ops[5], ops[10]);
+
+ plan.connect(ops[10], ops[6]);
+ plan.connect(ops[10], ops[7]);
+ plan.connect(ops[10], ops[8]);
+
+ //PlanPrinter<TOperator, TPlan> planPrinter = new PlanPrinter<TOperator, TPlan>(System.err, plan);
+ //planPrinter.visit();
+
+ try {
+ plan.pushAfter(ops[6], ops[9], 0) ;
+ fail("Expected exception for first operator having null successors");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1086);
+ }
+
+ try {
+ plan.pushAfter(ops[0], ops[9], 0) ;
+ fail("Expected exception for first operator having no inputs");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1088);
+ }
+
+ try {
+ plan.pushAfter(ops[9], ops[6], 0) ;
+ fail("Expected exception for first operator having one successor");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1086);
+ }
+
+ try {
+ plan.pushAfter(ops[6], ops[10], 0) ;
+ fail("Expected exception for first operator having one successors");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1086);
+ }
+
+
+ try {
+ plan.pushAfter(ops[10], ops[9], 4) ;
+ fail("Expected exception for outputNum exceeding the number of outputs of first operator");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1087);
+ }
+
+ try {
+ plan.pushAfter(ops[10], ops[6], 0) ;
+ fail("Expected exception for second operator having null successors");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1088);
+ }
+
+ try {
+ plan.pushAfter(ops[10], ops[10], 0) ;
+ fail("Expected exception for second operator having more than one successor");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1088);
+ }
+
+ try {
+ plan.pushAfter(ops[10], ops[0], 0) ;
+ fail("Expected exception for second operator not being a predecessor of first operator");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1089);
+ }
+
+ plan.disconnect(ops[4], ops[10]);
+ plan.disconnect(ops[5], ops[10]);
+
+ MultiOutputSingleInputOperator mosi = new MultiOutputSingleInputOperator("12");
+
+ plan.replace(ops[10], mosi);
+
+ try {
+ plan.pushAfter(mosi, ops[9], 0) ;
+ fail("Expected exception for trying to connect multiple inputs to the first operator");
+ } catch (PlanException pe) {
+ assertTrue(pe.getErrorCode() == 1091);
+ }
+
+ }
+
}