You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2012/04/03 16:42:32 UTC
svn commit: r1308953 - in
/incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya:
interpretor/WorkflowInterpreter.java util/InterpreterUtil.java
Author: lahiru
Date: Tue Apr 3 14:42:31 2012
New Revision: 1308953
URL: http://svn.apache.org/viewvc?rev=1308953&view=rev
Log:
refactoring WorkflowInterpreter Code.
Modified:
incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/util/InterpreterUtil.java
Modified: incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java?rev=1308953&r1=1308952&r2=1308953&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java (original)
+++ incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java Tue Apr 3 14:42:31 2012
@@ -70,6 +70,7 @@ import org.apache.airavata.xbaya.compone
import org.apache.airavata.xbaya.concurrent.PredicatedTaskRunner;
import org.apache.airavata.xbaya.graph.ControlPort;
import org.apache.airavata.xbaya.graph.DataPort;
+import org.apache.airavata.xbaya.graph.Graph;
import org.apache.airavata.xbaya.graph.Node;
import org.apache.airavata.xbaya.graph.amazon.InstanceNode;
import org.apache.airavata.xbaya.graph.dynamic.BasicTypeMapping;
@@ -88,6 +89,7 @@ import org.apache.airavata.xbaya.graph.s
import org.apache.airavata.xbaya.graph.system.OutputNode;
import org.apache.airavata.xbaya.graph.system.gui.DifferedInputComponent;
import org.apache.airavata.xbaya.graph.system.gui.DifferedInputHandler;
+import org.apache.airavata.xbaya.graph.ws.WSGraph;
import org.apache.airavata.xbaya.graph.ws.WSNode;
import org.apache.airavata.xbaya.graph.ws.WSPort;
import org.apache.airavata.xbaya.gui.Cancelable;
@@ -128,25 +130,28 @@ public class WorkflowInterpreter {
private static final int SERVER_MODE = 2;
- private static final int MAXIMUM_RETRY_TIME = 2;
+
+// private static final int MAXIMUM_RETRY_TIME = 2;
public static final String WORKFLOW_STARTED = "Workflow Running";
public static final String WORKFLOW_FINISHED = "Workflow Finished";
private XBayaEngine engine;
- private Map<Node, Integer> retryCounter = new HashMap<Node, Integer>();
+// private Map<Node, Integer> retryCounter = new HashMap<Node, Integer>();
private Map<Node, Invoker> invokerMap = new HashMap<Node, Invoker>();
private WorkflowNotifiable notifier;
- private boolean retryFailed = false;
+// private boolean retryFailed = false;
private MyProxyChecker myProxyChecker;
private Workflow workflow;
+ private WSGraph graph;
+
private boolean isSubWorkflow;
private XBayaConfiguration configuration;
@@ -182,15 +187,14 @@ public class WorkflowInterpreter {
public WorkflowInterpreter(XBayaConfiguration configuration, String topic,
Workflow workflow, String username, String password) {
this.configuration = configuration;
- // this.isSubWorkflow = isSubWorkflow;
this.username = username;
this.password = password;
this.topic = topic;
this.workflow = workflow;
- this.notifier = new NotificationSender(
+ this.graph = workflow.getGraph();
+ this.notifier = new NotificationSender(
this.configuration.getBrokerURL(), topic);
this.mode = SERVER_MODE;
- this.retryFailed = false;
this.runWithCrossProduct = this.configuration.isRunWithCrossProduct();
}
@@ -212,6 +216,7 @@ public class WorkflowInterpreter {
this.password = password;
this.topic = topic;
this.workflow = workflow;
+ this.graph = workflow.getGraph();
if (this.isoffline) {
this.notifier = new StandaloneNotificationSender(topic,
this.workflow);
@@ -219,7 +224,6 @@ public class WorkflowInterpreter {
throw new Error("Cannot Initialize workflow with offline false");
}
this.mode = SERVER_MODE;
- this.retryFailed = false;
}
/**
@@ -249,6 +253,7 @@ public class WorkflowInterpreter {
this.configuration = engine.getConfiguration();
this.myProxyChecker = new MyProxyChecker(this.engine);
this.workflow = workflow;
+ this.graph = workflow.getGraph();
this.isSubWorkflow = subWorkflow;
this.mode = GUI_MODE;
this.notifier = new NotificationSender(
@@ -356,8 +361,8 @@ public class WorkflowInterpreter {
// and there are failed nodes then workflow is stuck because
// of failure
// so we should pause the execution
- if (getRunningNodeCountDynamically() == 0
- && getFailedNodeCountDynamically() != 0) {
+ if (InterpreterUtil.getRunningNodeCountDynamically(this.graph) == 0
+ && InterpreterUtil.getFailedNodeCountDynamically(this.graph) != 0) {
this.getWorkflow().setExecutionState(
XBayaExecutionState.PAUSED);
}
@@ -370,7 +375,7 @@ public class WorkflowInterpreter {
}
}
- if (getFailedNodeCountDynamically() == 0) {
+ if (InterpreterUtil.getFailedNodeCountDynamically(this.graph) == 0) {
if (actOnProvenance) {
try {
try {
@@ -623,7 +628,7 @@ public class WorkflowInterpreter {
private void finish() throws XBayaException {
ArrayList<Node> outoutNodes = new ArrayList<Node>();
- List<NodeImpl> nodes = this.getWorkflow().getGraph().getNodes();
+ List<NodeImpl> nodes = this.graph.getNodes();
for (Node node : nodes) {
if (node instanceof OutputNode) {
if (node.getInputPort(0).getFromNode().getGUI().getBodyColor() == NodeState.FINISHED.color) {
@@ -797,7 +802,7 @@ public class WorkflowInterpreter {
this.invokerMap.remove(invoker);
}
final WSNode wsNode = (WSNode) node;
- String wsdlLocation = this.getEPR(wsNode);
+ String wsdlLocation = InterpreterUtil.getEPR(wsNode);
final String gfacURLString = this.configuration.getGFacURL().toString();
if (null == wsdlLocation) {
if (gfacURLString.startsWith("https")) {
@@ -1349,7 +1354,7 @@ public class WorkflowInterpreter {
String gfacURLString, WSComponent wsComponent)
throws XBayaException {
Invoker invoker;
- String wsdlLocation = getEPR((WSNode) foreachWSNode);
+ String wsdlLocation = InterpreterUtil.getEPR((WSNode) foreachWSNode);
QName portTypeQName = wsComponent.getPortTypeQName();
if (null == wsdlLocation) {
if (gfacURLString.startsWith("https")) {
@@ -1603,33 +1608,7 @@ public class WorkflowInterpreter {
return invoker;
}
- private void setInputValuesForForEach(Node middleNode,
- LinkedList<String> listOfValues, Integer[] inputNumbers,
- Invoker invoker, int index) throws XBayaException {
- List<DataPort> inputPorts = middleNode.getInputPorts();
- int innerIndex = 0;
- for (DataPort port : inputPorts) {
- Object inputVal = InterpreterUtil.findInputFromPort(port,
- this.invokerMap);
-
- /*
- * Handle ForEachNode
- */
- Node fromNode = port.getFromNode();
- if (fromNode instanceof ForEachNode) {
- inputVal = listOfValues.get(inputNumbers[innerIndex++] % index);
- index++;
- }
-
- if (null == inputVal) {
- throw new WorkFlowInterpreterException(
- "Unable to find inputs for the node:"
- + middleNode.getID());
- }
- invoker.setInput(port.getName(), inputVal);
- }
- }
private List<String> createInputValues(LinkedList<String> listOfValues,
Integer[] inputNumbers) throws XBayaException {
@@ -1693,7 +1672,7 @@ public class WorkflowInterpreter {
private ArrayList<Node> getReadyOutputNodesDynamically() {
ArrayList<Node> list = new ArrayList<Node>();
- List<NodeImpl> nodes = this.getWorkflow().getGraph().getNodes();
+ List<NodeImpl> nodes = this.graph.getNodes();
for (Node node : nodes) {
if (node instanceof OutputNode
&& node.getGUI().getBodyColor() == NodeGUI.DEFAULT_BODY_COLOR
@@ -1707,12 +1686,8 @@ public class WorkflowInterpreter {
}
private int getRemainNodesDynamically() {
- int failed = 0;
- if (this.retryFailed) {
- failed = this.getFailedNodeCountDynamically();
- }
- return this.getWaitingNodeCountDynamically()
- + this.getRunningNodeCountDynamically() + failed;
+ return InterpreterUtil.getWaitingNodeCountDynamically(this.graph)
+ + InterpreterUtil.getRunningNodeCountDynamically(this.graph) ;
}
private ArrayList<Node> getInputNodesDynamically() {
@@ -1735,8 +1710,8 @@ public class WorkflowInterpreter {
private ArrayList<Node> getReadyNodesDynamically() {
ArrayList<Node> list = new ArrayList<Node>();
- ArrayList<Node> waiting = this.getWaitingNodesDynamically();
- ArrayList<Node> finishedNodes = this.getFinishedNodesDynamically();
+ ArrayList<Node> waiting = InterpreterUtil.getWaitingNodesDynamically(this.graph);
+ ArrayList<Node> finishedNodes = InterpreterUtil.getFinishedNodesDynamically(this.graph);
for (Node node : waiting) {
Component component = node.getComponent();
if (component instanceof WSComponent
@@ -1831,36 +1806,9 @@ public class WorkflowInterpreter {
}
}
- if (this.retryFailed) {
- /*
- * Calculate Rerun time for each failed Node
- */
- for (Node node2 : this.getFailedNodesDynamically()) {
- if (this.retryCounter.containsKey(node2)) {
- int rerunTimes = this.retryCounter.get(node2).intValue();
- if (rerunTimes < MAXIMUM_RETRY_TIME) {
- this.retryCounter.put(node2,
- Integer.valueOf(++rerunTimes));
- list.add(node2);
- } else {
- // if some component fail so many times, stop the
- // workflow
- if (this.mode == GUI_MODE) {
- this.notifyPause();
- } else {
- this.getWorkflow().setExecutionState(
- XBayaExecutionState.STOPPED);
- }
- }
- } else {
- this.retryCounter.put(node2, Integer.valueOf(1));
- list.add(node2);
- }
- }
- }
if (this.mode == GUI_MODE) {
- ArrayList<Node> waitingNodes = this.getWaitingNodesDynamically();
+ ArrayList<Node> waitingNodes = InterpreterUtil.getWaitingNodesDynamically(this.graph);
for (Node readyNode : waitingNodes) {
DifferedInputHandler.handleDifferredInputsofDependentNodes(
readyNode, engine);
@@ -1871,113 +1819,9 @@ public class WorkflowInterpreter {
}
- private ArrayList<Node> getFinishedNodesDynamically() {
- return this.getNodesWithBodyColor(NodeState.FINISHED.color);
- }
- private ArrayList<Node> getFailedNodesDynamically() {
- return this.getNodesWithBodyColor(NodeState.FAILED.color);
- }
- private ArrayList<Node> getWaitingNodesDynamically() {
- return this.getNodesWithBodyColor(NodeGUI.DEFAULT_BODY_COLOR);
- }
- private ArrayList<Node> getNodesWithBodyColor(Color color) {
- ArrayList<Node> list = new ArrayList<Node>();
- List<NodeImpl> nodes = this.getWorkflow().getGraph().getNodes();
- for (Node node : nodes) {
- if (node.getGUI().getBodyColor() == color) {
- list.add(node);
- }
- }
- return list;
- }
-
- private int getRunningNodeCountDynamically() {
- return this.getNodeCountWithBodyColor(NodeState.EXECUTING.color);
- }
-
- private int getFailedNodeCountDynamically() {
- int failed = 0;
- /*
- * Take rerun time for each failed Node into consideration
- */
- for (Node node2 : this.getFailedNodesDynamically()) {
- if (this.retryCounter.containsKey(node2)) {
- int rerunTimes = this.retryCounter.get(node2).intValue();
- if (rerunTimes < MAXIMUM_RETRY_TIME) {
- failed++;
- }
- } else {
- failed++;
- }
- }
- return failed;
- }
-
- private int getWaitingNodeCountDynamically() {
- return this.getNodeCountWithBodyColor(NodeGUI.DEFAULT_BODY_COLOR);
- }
-
- private int getNodeCountWithBodyColor(Color color) {
- int sum = 0;
- List<NodeImpl> nodes = this.getWorkflow().getGraph().getNodes();
- for (Node node : nodes) {
- if (node.getGUI().getBodyColor() == color) {
- ++sum;
- }
- }
- return sum;
- }
-
- private String getEPR(WSNode wsNode) {
- Iterable<WsdlService> services = wsNode.getComponent().getWSDL()
- .services();
- Iterator<WsdlService> iterator = services.iterator();
- if (iterator.hasNext()) {
- Iterable<WsdlPort> ports = iterator.next().ports();
- Iterator<WsdlPort> portIterator = ports.iterator();
- if (portIterator.hasNext()) {
- WsdlPort port = portIterator.next();
- Iterable children = port.xml().children();
- Iterator childIterator = children.iterator();
- while (childIterator.hasNext()) {
- Object next = childIterator.next();
- if (next instanceof XmlElementWithViewsImpl) {
- org.xmlpull.infoset.XmlAttribute epr = ((XmlElementWithViewsImpl) next)
- .attribute("location");
- return epr.getValue();
- }
- }
- }
- }
- return null;
- }
-
- private String getEPRForSubWorkflow(SubWorkflowNode subWorkflowNode) {
- Iterable<WsdlService> services = subWorkflowNode.getComponent()
- .getWSDL().services();
- Iterator<WsdlService> iterator = services.iterator();
- if (iterator.hasNext()) {
- Iterable<WsdlPort> ports = iterator.next().ports();
- Iterator<WsdlPort> portIterator = ports.iterator();
- if (portIterator.hasNext()) {
- WsdlPort port = portIterator.next();
- Iterable children = port.xml().children();
- Iterator childIterator = children.iterator();
- while (childIterator.hasNext()) {
- Object next = childIterator.next();
- if (next instanceof XmlElementWithViewsImpl) {
- org.xmlpull.infoset.XmlAttribute epr = ((XmlElementWithViewsImpl) next)
- .attribute("location");
- return epr.getValue();
- }
- }
- }
- }
- return null;
- }
public boolean isRunWithCrossProduct() {
return runWithCrossProduct;
Modified: incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/util/InterpreterUtil.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/util/InterpreterUtil.java?rev=1308953&r1=1308952&r2=1308953&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/util/InterpreterUtil.java (original)
+++ incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/util/InterpreterUtil.java Tue Apr 3 14:42:31 2012
@@ -20,18 +20,18 @@
*/
package org.apache.airavata.xbaya.util;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
+import java.awt.*;
+import java.util.*;
import java.util.List;
-import java.util.Map;
import org.apache.airavata.xbaya.XBayaException;
import org.apache.airavata.xbaya.XBayaRuntimeException;
import org.apache.airavata.xbaya.graph.DataPort;
import org.apache.airavata.xbaya.graph.Node;
import org.apache.airavata.xbaya.graph.amazon.InstanceNode;
+import org.apache.airavata.xbaya.graph.gui.NodeGUI;
+import org.apache.airavata.xbaya.graph.impl.NodeImpl;
+import org.apache.airavata.xbaya.graph.subworkflow.SubWorkflowNode;
import org.apache.airavata.xbaya.graph.system.ConstantNode;
import org.apache.airavata.xbaya.graph.system.EndForEachNode;
import org.apache.airavata.xbaya.graph.system.EndifNode;
@@ -39,15 +39,21 @@ import org.apache.airavata.xbaya.graph.s
import org.apache.airavata.xbaya.graph.system.InputNode;
import org.apache.airavata.xbaya.graph.system.SystemDataPort;
import org.apache.airavata.xbaya.graph.system.gui.DifferedInputNode;
+import org.apache.airavata.xbaya.graph.ws.WSGraph;
+import org.apache.airavata.xbaya.graph.ws.WSNode;
import org.apache.airavata.xbaya.graph.ws.WSPort;
import org.apache.airavata.xbaya.interpretor.SystemComponentInvoker;
import org.apache.airavata.xbaya.interpretor.WorkFlowInterpreterException;
import org.apache.airavata.xbaya.invoker.GenericInvoker;
import org.apache.airavata.xbaya.invoker.Invoker;
import org.apache.airavata.xbaya.invoker.WorkflowInvokerWrapperForGFacInvoker;
+import org.apache.airavata.xbaya.monitor.gui.MonitorEventHandler;
import org.xmlpull.infoset.XmlElement;
+import org.xmlpull.infoset.impl.XmlElementWithViewsImpl;
import xsul5.XmlConstants;
+import xsul5.wsdl.WsdlPort;
+import xsul5.wsdl.WsdlService;
public class InterpreterUtil {
/**
@@ -345,4 +351,74 @@ public class InterpreterUtil {
}
return inputNumbers;
}
+ public static ArrayList<Node> getFinishedNodesDynamically(WSGraph graph) {
+ return getNodesWithBodyColor(MonitorEventHandler.NodeState.FINISHED.color,graph);
+ }
+
+ public static ArrayList<Node> getFailedNodesDynamically(WSGraph graph) {
+ return getNodesWithBodyColor(MonitorEventHandler.NodeState.FAILED.color,graph);
+ }
+
+ public static ArrayList<Node> getWaitingNodesDynamically(WSGraph graph) {
+ return getNodesWithBodyColor(NodeGUI.DEFAULT_BODY_COLOR,graph);
+ }
+
+ public static ArrayList<Node> getNodesWithBodyColor(Color color,WSGraph graph) {
+ ArrayList<Node> list = new ArrayList<Node>();
+ List<NodeImpl> nodes = graph.getNodes();
+ for (Node node : nodes) {
+ if (node.getGUI().getBodyColor() == color) {
+ list.add(node);
+ }
+ }
+ return list;
+ }
+
+ public static int getRunningNodeCountDynamically(WSGraph graph) {
+ return getNodeCountWithBodyColor(MonitorEventHandler.NodeState.EXECUTING.color,graph);
+ }
+
+ public static int getFailedNodeCountDynamically(WSGraph graph) {
+ return getFailedNodesDynamically(graph).size();
+ }
+
+ public static int getWaitingNodeCountDynamically(WSGraph graph) {
+ return getNodeCountWithBodyColor(NodeGUI.DEFAULT_BODY_COLOR,graph);
+ }
+
+ public static int getNodeCountWithBodyColor(Color color,WSGraph graph) {
+ int sum = 0;
+ List<NodeImpl> nodes = graph.getNodes();
+ for (Node node : nodes) {
+ if (node.getGUI().getBodyColor() == color) {
+ ++sum;
+ }
+ }
+ return sum;
+ }
+
+ public static String getEPR(WSNode wsNode) {
+ Iterable<WsdlService> services = wsNode.getComponent().getWSDL()
+ .services();
+ Iterator<WsdlService> iterator = services.iterator();
+ if (iterator.hasNext()) {
+ Iterable<WsdlPort> ports = iterator.next().ports();
+ Iterator<WsdlPort> portIterator = ports.iterator();
+ if (portIterator.hasNext()) {
+ WsdlPort port = portIterator.next();
+ Iterable children = port.xml().children();
+ Iterator childIterator = children.iterator();
+ while (childIterator.hasNext()) {
+ Object next = childIterator.next();
+ if (next instanceof XmlElementWithViewsImpl) {
+ org.xmlpull.infoset.XmlAttribute epr = ((XmlElementWithViewsImpl) next)
+ .attribute("location");
+ return epr.getValue();
+ }
+ }
+ }
+ }
+ return null;
+ }
+
}