You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2012/04/03 16:42:32 UTC

svn commit: r1308953 - in /incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya: interpretor/WorkflowInterpreter.java util/InterpreterUtil.java

Author: lahiru
Date: Tue Apr  3 14:42:31 2012
New Revision: 1308953

URL: http://svn.apache.org/viewvc?rev=1308953&view=rev
Log:
refactoring WorkflowInterpreter Code.

Modified:
    incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
    incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/util/InterpreterUtil.java

Modified: incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java?rev=1308953&r1=1308952&r2=1308953&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java (original)
+++ incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java Tue Apr  3 14:42:31 2012
@@ -70,6 +70,7 @@ import org.apache.airavata.xbaya.compone
 import org.apache.airavata.xbaya.concurrent.PredicatedTaskRunner;
 import org.apache.airavata.xbaya.graph.ControlPort;
 import org.apache.airavata.xbaya.graph.DataPort;
+import org.apache.airavata.xbaya.graph.Graph;
 import org.apache.airavata.xbaya.graph.Node;
 import org.apache.airavata.xbaya.graph.amazon.InstanceNode;
 import org.apache.airavata.xbaya.graph.dynamic.BasicTypeMapping;
@@ -88,6 +89,7 @@ import org.apache.airavata.xbaya.graph.s
 import org.apache.airavata.xbaya.graph.system.OutputNode;
 import org.apache.airavata.xbaya.graph.system.gui.DifferedInputComponent;
 import org.apache.airavata.xbaya.graph.system.gui.DifferedInputHandler;
+import org.apache.airavata.xbaya.graph.ws.WSGraph;
 import org.apache.airavata.xbaya.graph.ws.WSNode;
 import org.apache.airavata.xbaya.graph.ws.WSPort;
 import org.apache.airavata.xbaya.gui.Cancelable;
@@ -128,25 +130,28 @@ public class WorkflowInterpreter {
 
 	private static final int SERVER_MODE = 2;
 
-	private static final int MAXIMUM_RETRY_TIME = 2;
+
+//	private static final int MAXIMUM_RETRY_TIME = 2;
 
 	public static final String WORKFLOW_STARTED = "Workflow Running";
 	public static final String WORKFLOW_FINISHED = "Workflow Finished";
 
 	private XBayaEngine engine;
 
-	private Map<Node, Integer> retryCounter = new HashMap<Node, Integer>();
+//	private Map<Node, Integer> retryCounter = new HashMap<Node, Integer>();
 
 	private Map<Node, Invoker> invokerMap = new HashMap<Node, Invoker>();
 
 	private WorkflowNotifiable notifier;
 
-	private boolean retryFailed = false;
+//	private boolean retryFailed = false;
 
 	private MyProxyChecker myProxyChecker;
 
 	private Workflow workflow;
 
+    private WSGraph graph;
+
 	private boolean isSubWorkflow;
 
 	private XBayaConfiguration configuration;
@@ -182,15 +187,14 @@ public class WorkflowInterpreter {
 	public WorkflowInterpreter(XBayaConfiguration configuration, String topic,
 			Workflow workflow, String username, String password) {
 		this.configuration = configuration;
-		// this.isSubWorkflow = isSubWorkflow;
 		this.username = username;
 		this.password = password;
 		this.topic = topic;
 		this.workflow = workflow;
-		this.notifier = new NotificationSender(
+        this.graph = workflow.getGraph();
+        this.notifier = new NotificationSender(
 				this.configuration.getBrokerURL(), topic);
 		this.mode = SERVER_MODE;
-		this.retryFailed = false;
 		this.runWithCrossProduct = this.configuration.isRunWithCrossProduct();
 	}
 
@@ -212,6 +216,7 @@ public class WorkflowInterpreter {
 		this.password = password;
 		this.topic = topic;
 		this.workflow = workflow;
+        this.graph = workflow.getGraph();
 		if (this.isoffline) {
 			this.notifier = new StandaloneNotificationSender(topic,
 					this.workflow);
@@ -219,7 +224,6 @@ public class WorkflowInterpreter {
 			throw new Error("Cannot Initialize workflow with offline false");
 		}
 		this.mode = SERVER_MODE;
-		this.retryFailed = false;
 	}
 
 	/**
@@ -249,6 +253,7 @@ public class WorkflowInterpreter {
 		this.configuration = engine.getConfiguration();
 		this.myProxyChecker = new MyProxyChecker(this.engine);
 		this.workflow = workflow;
+        this.graph = workflow.getGraph();
 		this.isSubWorkflow = subWorkflow;
 		this.mode = GUI_MODE;
 		this.notifier = new NotificationSender(
@@ -356,8 +361,8 @@ public class WorkflowInterpreter {
 					// and there are failed nodes then workflow is stuck because
 					// of failure
 					// so we should pause the execution
-					if (getRunningNodeCountDynamically() == 0
-							&& getFailedNodeCountDynamically() != 0) {
+					if (InterpreterUtil.getRunningNodeCountDynamically(this.graph) == 0
+							&& InterpreterUtil.getFailedNodeCountDynamically(this.graph) != 0) {
 						this.getWorkflow().setExecutionState(
 								XBayaExecutionState.PAUSED);
 					}
@@ -370,7 +375,7 @@ public class WorkflowInterpreter {
 				}
 			}
 
-			if (getFailedNodeCountDynamically() == 0) {
+			if (InterpreterUtil.getFailedNodeCountDynamically(this.graph) == 0) {
 				if (actOnProvenance) {
 					try {
 						try {
@@ -623,7 +628,7 @@ public class WorkflowInterpreter {
 
 	private void finish() throws XBayaException {
 		ArrayList<Node> outoutNodes = new ArrayList<Node>();
-		List<NodeImpl> nodes = this.getWorkflow().getGraph().getNodes();
+		List<NodeImpl> nodes = this.graph.getNodes();
 		for (Node node : nodes) {
 			if (node instanceof OutputNode) {
 				if (node.getInputPort(0).getFromNode().getGUI().getBodyColor() == NodeState.FINISHED.color) {
@@ -797,7 +802,7 @@ public class WorkflowInterpreter {
 			this.invokerMap.remove(invoker);
 		}
 		final WSNode wsNode = (WSNode) node;
-		String wsdlLocation = this.getEPR(wsNode);
+		String wsdlLocation = InterpreterUtil.getEPR(wsNode);
 		final String gfacURLString = this.configuration.getGFacURL().toString();
 		if (null == wsdlLocation) {
 			if (gfacURLString.startsWith("https")) {
@@ -1349,7 +1354,7 @@ public class WorkflowInterpreter {
 			String gfacURLString, WSComponent wsComponent)
 			throws XBayaException {
 		Invoker invoker;
-		String wsdlLocation = getEPR((WSNode) foreachWSNode);
+		String wsdlLocation = InterpreterUtil.getEPR((WSNode) foreachWSNode);
 		QName portTypeQName = wsComponent.getPortTypeQName();
 		if (null == wsdlLocation) {
 			if (gfacURLString.startsWith("https")) {
@@ -1603,33 +1608,7 @@ public class WorkflowInterpreter {
 		return invoker;
 	}
 
-	private void setInputValuesForForEach(Node middleNode,
-			LinkedList<String> listOfValues, Integer[] inputNumbers,
-			Invoker invoker, int index) throws XBayaException {
-		List<DataPort> inputPorts = middleNode.getInputPorts();
-		int innerIndex = 0;
-		for (DataPort port : inputPorts) {
-			Object inputVal = InterpreterUtil.findInputFromPort(port,
-					this.invokerMap);
-
-			/*
-			 * Handle ForEachNode
-			 */
-			Node fromNode = port.getFromNode();
-			if (fromNode instanceof ForEachNode) {
-				inputVal = listOfValues.get(inputNumbers[innerIndex++] % index);
-				index++;
-			}
-
-			if (null == inputVal) {
-				throw new WorkFlowInterpreterException(
-						"Unable to find inputs for the node:"
-								+ middleNode.getID());
-			}
-			invoker.setInput(port.getName(), inputVal);
-		}
 
-	}
 
 	private List<String> createInputValues(LinkedList<String> listOfValues,
 			Integer[] inputNumbers) throws XBayaException {
@@ -1693,7 +1672,7 @@ public class WorkflowInterpreter {
 
 	private ArrayList<Node> getReadyOutputNodesDynamically() {
 		ArrayList<Node> list = new ArrayList<Node>();
-		List<NodeImpl> nodes = this.getWorkflow().getGraph().getNodes();
+		List<NodeImpl> nodes = this.graph.getNodes();
 		for (Node node : nodes) {
 			if (node instanceof OutputNode
 					&& node.getGUI().getBodyColor() == NodeGUI.DEFAULT_BODY_COLOR
@@ -1707,12 +1686,8 @@ public class WorkflowInterpreter {
 	}
 
 	private int getRemainNodesDynamically() {
-		int failed = 0;
-		if (this.retryFailed) {
-			failed = this.getFailedNodeCountDynamically();
-		}
-		return this.getWaitingNodeCountDynamically()
-				+ this.getRunningNodeCountDynamically() + failed;
+		return InterpreterUtil.getWaitingNodeCountDynamically(this.graph)
+				+ InterpreterUtil.getRunningNodeCountDynamically(this.graph) ;
 	}
 
 	private ArrayList<Node> getInputNodesDynamically() {
@@ -1735,8 +1710,8 @@ public class WorkflowInterpreter {
 
 	private ArrayList<Node> getReadyNodesDynamically() {
 		ArrayList<Node> list = new ArrayList<Node>();
-		ArrayList<Node> waiting = this.getWaitingNodesDynamically();
-		ArrayList<Node> finishedNodes = this.getFinishedNodesDynamically();
+		ArrayList<Node> waiting = InterpreterUtil.getWaitingNodesDynamically(this.graph);
+		ArrayList<Node> finishedNodes = InterpreterUtil.getFinishedNodesDynamically(this.graph);
 		for (Node node : waiting) {
 			Component component = node.getComponent();
 			if (component instanceof WSComponent
@@ -1831,36 +1806,9 @@ public class WorkflowInterpreter {
 			}
 		}
 
-		if (this.retryFailed) {
-			/*
-			 * Calculate Rerun time for each failed Node
-			 */
-			for (Node node2 : this.getFailedNodesDynamically()) {
-				if (this.retryCounter.containsKey(node2)) {
-					int rerunTimes = this.retryCounter.get(node2).intValue();
-					if (rerunTimes < MAXIMUM_RETRY_TIME) {
-						this.retryCounter.put(node2,
-								Integer.valueOf(++rerunTimes));
-						list.add(node2);
-					} else {
-						// if some component fail so many times, stop the
-						// workflow
-						if (this.mode == GUI_MODE) {
-							this.notifyPause();
-						} else {
-							this.getWorkflow().setExecutionState(
-									XBayaExecutionState.STOPPED);
-						}
-					}
-				} else {
-					this.retryCounter.put(node2, Integer.valueOf(1));
-					list.add(node2);
-				}
-			}
-		}
 
 		if (this.mode == GUI_MODE) {
-			ArrayList<Node> waitingNodes = this.getWaitingNodesDynamically();
+			ArrayList<Node> waitingNodes = InterpreterUtil.getWaitingNodesDynamically(this.graph);
 			for (Node readyNode : waitingNodes) {
 				DifferedInputHandler.handleDifferredInputsofDependentNodes(
 						readyNode, engine);
@@ -1871,113 +1819,9 @@ public class WorkflowInterpreter {
 
 	}
 
-	private ArrayList<Node> getFinishedNodesDynamically() {
-		return this.getNodesWithBodyColor(NodeState.FINISHED.color);
-	}
 
-	private ArrayList<Node> getFailedNodesDynamically() {
-		return this.getNodesWithBodyColor(NodeState.FAILED.color);
-	}
 
-	private ArrayList<Node> getWaitingNodesDynamically() {
-		return this.getNodesWithBodyColor(NodeGUI.DEFAULT_BODY_COLOR);
-	}
 
-	private ArrayList<Node> getNodesWithBodyColor(Color color) {
-		ArrayList<Node> list = new ArrayList<Node>();
-		List<NodeImpl> nodes = this.getWorkflow().getGraph().getNodes();
-		for (Node node : nodes) {
-			if (node.getGUI().getBodyColor() == color) {
-				list.add(node);
-			}
-		}
-		return list;
-	}
-
-	private int getRunningNodeCountDynamically() {
-		return this.getNodeCountWithBodyColor(NodeState.EXECUTING.color);
-	}
-
-	private int getFailedNodeCountDynamically() {
-		int failed = 0;
-		/*
-		 * Take rerun time for each failed Node into consideration
-		 */
-		for (Node node2 : this.getFailedNodesDynamically()) {
-			if (this.retryCounter.containsKey(node2)) {
-				int rerunTimes = this.retryCounter.get(node2).intValue();
-				if (rerunTimes < MAXIMUM_RETRY_TIME) {
-					failed++;
-				}
-			} else {
-				failed++;
-			}
-		}
-		return failed;
-	}
-
-	private int getWaitingNodeCountDynamically() {
-		return this.getNodeCountWithBodyColor(NodeGUI.DEFAULT_BODY_COLOR);
-	}
-
-	private int getNodeCountWithBodyColor(Color color) {
-		int sum = 0;
-		List<NodeImpl> nodes = this.getWorkflow().getGraph().getNodes();
-		for (Node node : nodes) {
-			if (node.getGUI().getBodyColor() == color) {
-				++sum;
-			}
-		}
-		return sum;
-	}
-
-	private String getEPR(WSNode wsNode) {
-		Iterable<WsdlService> services = wsNode.getComponent().getWSDL()
-				.services();
-		Iterator<WsdlService> iterator = services.iterator();
-		if (iterator.hasNext()) {
-			Iterable<WsdlPort> ports = iterator.next().ports();
-			Iterator<WsdlPort> portIterator = ports.iterator();
-			if (portIterator.hasNext()) {
-				WsdlPort port = portIterator.next();
-				Iterable children = port.xml().children();
-				Iterator childIterator = children.iterator();
-				while (childIterator.hasNext()) {
-					Object next = childIterator.next();
-					if (next instanceof XmlElementWithViewsImpl) {
-						org.xmlpull.infoset.XmlAttribute epr = ((XmlElementWithViewsImpl) next)
-								.attribute("location");
-						return epr.getValue();
-					}
-				}
-			}
-		}
-		return null;
-	}
-
-	private String getEPRForSubWorkflow(SubWorkflowNode subWorkflowNode) {
-		Iterable<WsdlService> services = subWorkflowNode.getComponent()
-				.getWSDL().services();
-		Iterator<WsdlService> iterator = services.iterator();
-		if (iterator.hasNext()) {
-			Iterable<WsdlPort> ports = iterator.next().ports();
-			Iterator<WsdlPort> portIterator = ports.iterator();
-			if (portIterator.hasNext()) {
-				WsdlPort port = portIterator.next();
-				Iterable children = port.xml().children();
-				Iterator childIterator = children.iterator();
-				while (childIterator.hasNext()) {
-					Object next = childIterator.next();
-					if (next instanceof XmlElementWithViewsImpl) {
-						org.xmlpull.infoset.XmlAttribute epr = ((XmlElementWithViewsImpl) next)
-								.attribute("location");
-						return epr.getValue();
-					}
-				}
-			}
-		}
-		return null;
-	}
 
 	public boolean isRunWithCrossProduct() {
 		return runWithCrossProduct;

Modified: incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/util/InterpreterUtil.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/util/InterpreterUtil.java?rev=1308953&r1=1308952&r2=1308953&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/util/InterpreterUtil.java (original)
+++ incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/util/InterpreterUtil.java Tue Apr  3 14:42:31 2012
@@ -20,18 +20,18 @@
 */
 package org.apache.airavata.xbaya.util;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
+import java.awt.*;
+import java.util.*;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.airavata.xbaya.XBayaException;
 import org.apache.airavata.xbaya.XBayaRuntimeException;
 import org.apache.airavata.xbaya.graph.DataPort;
 import org.apache.airavata.xbaya.graph.Node;
 import org.apache.airavata.xbaya.graph.amazon.InstanceNode;
+import org.apache.airavata.xbaya.graph.gui.NodeGUI;
+import org.apache.airavata.xbaya.graph.impl.NodeImpl;
+import org.apache.airavata.xbaya.graph.subworkflow.SubWorkflowNode;
 import org.apache.airavata.xbaya.graph.system.ConstantNode;
 import org.apache.airavata.xbaya.graph.system.EndForEachNode;
 import org.apache.airavata.xbaya.graph.system.EndifNode;
@@ -39,15 +39,21 @@ import org.apache.airavata.xbaya.graph.s
 import org.apache.airavata.xbaya.graph.system.InputNode;
 import org.apache.airavata.xbaya.graph.system.SystemDataPort;
 import org.apache.airavata.xbaya.graph.system.gui.DifferedInputNode;
+import org.apache.airavata.xbaya.graph.ws.WSGraph;
+import org.apache.airavata.xbaya.graph.ws.WSNode;
 import org.apache.airavata.xbaya.graph.ws.WSPort;
 import org.apache.airavata.xbaya.interpretor.SystemComponentInvoker;
 import org.apache.airavata.xbaya.interpretor.WorkFlowInterpreterException;
 import org.apache.airavata.xbaya.invoker.GenericInvoker;
 import org.apache.airavata.xbaya.invoker.Invoker;
 import org.apache.airavata.xbaya.invoker.WorkflowInvokerWrapperForGFacInvoker;
+import org.apache.airavata.xbaya.monitor.gui.MonitorEventHandler;
 import org.xmlpull.infoset.XmlElement;
 
+import org.xmlpull.infoset.impl.XmlElementWithViewsImpl;
 import xsul5.XmlConstants;
+import xsul5.wsdl.WsdlPort;
+import xsul5.wsdl.WsdlService;
 
 public class InterpreterUtil {
     /**
@@ -345,4 +351,74 @@ public class InterpreterUtil {
     }
         return inputNumbers;
     }
+    	public static ArrayList<Node> getFinishedNodesDynamically(WSGraph graph) {
+		return getNodesWithBodyColor(MonitorEventHandler.NodeState.FINISHED.color,graph);
+	}
+
+	public static ArrayList<Node> getFailedNodesDynamically(WSGraph graph) {
+		return getNodesWithBodyColor(MonitorEventHandler.NodeState.FAILED.color,graph);
+	}
+
+	public static ArrayList<Node> getWaitingNodesDynamically(WSGraph graph) {
+		return getNodesWithBodyColor(NodeGUI.DEFAULT_BODY_COLOR,graph);
+	}
+
+	public static ArrayList<Node> getNodesWithBodyColor(Color color,WSGraph graph) {
+		ArrayList<Node> list = new ArrayList<Node>();
+		List<NodeImpl> nodes = graph.getNodes();
+		for (Node node : nodes) {
+			if (node.getGUI().getBodyColor() == color) {
+				list.add(node);
+			}
+		}
+		return list;
+	}
+
+	public static int getRunningNodeCountDynamically(WSGraph graph) {
+		return getNodeCountWithBodyColor(MonitorEventHandler.NodeState.EXECUTING.color,graph);
+	}
+
+	public static int getFailedNodeCountDynamically(WSGraph graph) {
+        return getFailedNodesDynamically(graph).size();
+	}
+
+	public static int getWaitingNodeCountDynamically(WSGraph graph) {
+		return getNodeCountWithBodyColor(NodeGUI.DEFAULT_BODY_COLOR,graph);
+	}
+
+	public static int getNodeCountWithBodyColor(Color color,WSGraph graph) {
+		int sum = 0;
+		List<NodeImpl> nodes = graph.getNodes();
+		for (Node node : nodes) {
+			if (node.getGUI().getBodyColor() == color) {
+				++sum;
+			}
+		}
+		return sum;
+	}
+
+    public static String getEPR(WSNode wsNode) {
+		Iterable<WsdlService> services = wsNode.getComponent().getWSDL()
+				.services();
+		Iterator<WsdlService> iterator = services.iterator();
+		if (iterator.hasNext()) {
+			Iterable<WsdlPort> ports = iterator.next().ports();
+			Iterator<WsdlPort> portIterator = ports.iterator();
+			if (portIterator.hasNext()) {
+				WsdlPort port = portIterator.next();
+				Iterable children = port.xml().children();
+				Iterator childIterator = children.iterator();
+				while (childIterator.hasNext()) {
+					Object next = childIterator.next();
+					if (next instanceof XmlElementWithViewsImpl) {
+						org.xmlpull.infoset.XmlAttribute epr = ((XmlElementWithViewsImpl) next)
+								.attribute("location");
+						return epr.getValue();
+					}
+				}
+			}
+		}
+		return null;
+	}
+
 }