You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2012/11/20 17:31:01 UTC
svn commit: r1411730 - in /airavata/trunk/modules:
airavata-client/src/main/java/org/apache/airavata/client/impl/
workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/
workflow-model/workflow-model-core/src/main/jav...
Author: samindaw
Date: Tue Nov 20 16:31:00 2012
New Revision: 1411730
URL: http://svn.apache.org/viewvc?rev=1411730&view=rev
Log:
refactoring the workflow interpreter - let the games begin
Modified:
airavata/trunk/modules/airavata-client/src/main/java/org/apache/airavata/client/impl/WorkflowManagerImpl.java
airavata/trunk/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/Node.java
airavata/trunk/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/subworkflow/SubWorkflowNode.java
airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
Modified: airavata/trunk/modules/airavata-client/src/main/java/org/apache/airavata/client/impl/WorkflowManagerImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/airavata-client/src/main/java/org/apache/airavata/client/impl/WorkflowManagerImpl.java?rev=1411730&r1=1411729&r2=1411730&view=diff
==============================================================================
--- airavata/trunk/modules/airavata-client/src/main/java/org/apache/airavata/client/impl/WorkflowManagerImpl.java (original)
+++ airavata/trunk/modules/airavata-client/src/main/java/org/apache/airavata/client/impl/WorkflowManagerImpl.java Tue Nov 20 16:31:00 2012
@@ -30,7 +30,6 @@ import org.apache.airavata.client.Airava
import org.apache.airavata.client.api.AiravataAPIInvocationException;
import org.apache.airavata.client.api.WorkflowManager;
import org.apache.airavata.common.utils.XMLUtil;
-import org.apache.airavata.registry.api.exception.RegistryException;
import org.apache.airavata.workflow.model.wf.Workflow;
import org.apache.airavata.workflow.model.wf.WorkflowData;
import org.apache.airavata.workflow.model.wf.WorkflowInput;
Modified: airavata/trunk/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/Node.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/Node.java?rev=1411730&r1=1411729&r2=1411730&view=diff
==============================================================================
--- airavata/trunk/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/Node.java (original)
+++ airavata/trunk/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/Node.java Tue Nov 20 16:31:00 2012
@@ -204,6 +204,8 @@ public interface Node extends GraphPiece
public NodeExecutionState getState();
public void setState(NodeExecutionState state);
+
+// public void executeDynamically();
public void registerObserver(NodeObserver o);
public void removeObserver(NodeObserver o);
@@ -215,4 +217,5 @@ public interface Node extends GraphPiece
public static interface NodeObserver{
public void nodeUpdated(NodeUpdateType type);
}
+
}
\ No newline at end of file
Modified: airavata/trunk/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/subworkflow/SubWorkflowNode.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/subworkflow/SubWorkflowNode.java?rev=1411730&r1=1411729&r2=1411730&view=diff
==============================================================================
--- airavata/trunk/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/subworkflow/SubWorkflowNode.java (original)
+++ airavata/trunk/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/graph/subworkflow/SubWorkflowNode.java Tue Nov 20 16:31:00 2012
@@ -95,5 +95,5 @@ public class SubWorkflowNode extends Nod
return ports.get(index);
}
-
+
}
\ No newline at end of file
Modified: airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java?rev=1411730&r1=1411729&r2=1411730&view=diff
==============================================================================
--- airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java (original)
+++ airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/interpretor/WorkflowInterpreter.java Tue Nov 20 16:31:00 2012
@@ -202,9 +202,6 @@ public class WorkflowInterpreter {
}
// get task list and execute them
ArrayList<Node> readyNodes = this.getReadyNodesDynamically();
-// if(readyNodes.size() == 0){
-// this.getWorkflow().setExecutionState(WorkflowExecutionState.STOPPED);
-// }
for (Node node : readyNodes) {
if (node.isBreak()) {
this.notifyPause();
@@ -217,16 +214,7 @@ public class WorkflowInterpreter {
// want
// recalculate the execution stack
}
-//
-// boolean nodeOutputLoadedFromProvenance = false;
-// if (this.config.isActOnProvenance()) {
-// nodeOutputLoadedFromProvenance = readProvenance(node);
-// if (!nodeOutputLoadedFromProvenance) {
-// executeDynamically(node);
-// }
-// } else {
- executeDynamically(node);
-// }
+ executeDynamically(node);
if (this.getWorkflow().getExecutionState() == WorkflowExecutionState.STEP) {
this.getWorkflow().setExecutionState(WorkflowExecutionState.PAUSED);
break;
@@ -383,13 +371,6 @@ public class WorkflowInterpreter {
*/
public void raiseException(Throwable e) {
notifyViaInteractor(WorkflowExecutionMessage.EXECUTION_ERROR, e);
- // // throw new RuntimeException(e);
- // if (this.config.getMode() ==
- // WorkflowInterpreterConfiguration.GUI_MODE) {
- // this.config.getGUI().getErrorWindow().error(e);
- // } else {
- // throw new RuntimeException(e);
- // }
}
/**
@@ -400,19 +381,6 @@ public class WorkflowInterpreter {
throw new WorkflowRuntimeException("Cannot pause when not running");
}
notifyViaInteractor(WorkflowExecutionMessage.EXECUTION_STATE_CHANGED, WorkflowExecutionState.PAUSED);
- // if (this.config.getMode() ==
- // WorkflowInterpreterConfiguration.GUI_MODE) {
- //
- // if (this.getWorkflow().getExecutionState() ==
- // WorkflowExecutionState.RUNNING
- // || this.getWorkflow().getExecutionState() ==
- // WorkflowExecutionState.STEP) {
- // this.config.getGUI().getToolbar().getPlayAction()
- // .actionPerformed(null);
- // } else {
- // throw new WorkflowRuntimeException("Cannot pause when not running");
- // }
- // }
}
/**
@@ -421,15 +389,6 @@ public class WorkflowInterpreter {
public void cleanup() throws MonitorException {
this.getWorkflow().setExecutionState(WorkflowExecutionState.STOPPED);
notifyViaInteractor(WorkflowExecutionMessage.EXECUTION_CLEANUP, null);
- // if (this.config.getMode() ==
- // WorkflowInterpreterConfiguration.GUI_MODE) {
- // this.engine.resetWorkflowInterpreter();
- // try {
- // this.engine.getMonitor().stop();
- // } finally {
- // this.engine.getMonitor().reset();
- // }
- // }
}
private void sendOutputsDynamically() throws WorkflowException {
@@ -569,50 +528,55 @@ public class WorkflowInterpreter {
} else if (component instanceof EndifComponent) {
handleEndIf(node);
} else if (component instanceof DoWhileComponent) {
- // Executor thread is shutdown inside as thread gets killed when you
- // shutdown
- ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
- DoWhileHandler doWhileHandler = new DoWhileHandler((DoWhileNode) node, this.invokerMap, getWaitingNodesDynamically(),
- getFinishedNodesDynamically(), this, threadExecutor);
- threadExecutor.submit(doWhileHandler);
+ handleDoWhile(node);
}else if (component instanceof EndDoWhileComponent) {
// Component is handled in DoWhileHandler after eval condition
}
else if (component instanceof InstanceComponent) {
- if (AmazonCredential.getInstance().getAwsAccessKeyId().isEmpty() || AmazonCredential.getInstance().getAwsSecretAccessKey().isEmpty()) {
- throw new WorkFlowInterpreterException("Please set Amazon Credential before using EC2 Instance Component");
- }
- for (ControlPort ports : node.getControlOutPorts()) {
- ports.setConditionMet(true);
- }
+ handleAmazonInstance(node);
} else if (component instanceof TerminateInstanceComponent) {
- Object inputVal = InterpreterUtil.findInputFromPort(node.getInputPort(0), this.invokerMap);
- String instanceId = inputVal.toString();
- /*
- * Terminate instance
- */
- AmazonUtil.terminateInstances(instanceId);
-
- // set color to done
- node.setState(NodeExecutionState.FINISHED);
+ handleAmazonTerminateInstance(node);
} else {
throw new WorkFlowInterpreterException("Encountered Node that cannot be executed:" + node);
}
}
+ private void handleAmazonTerminateInstance(final Node node)
+ throws WorkflowException {
+ Object inputVal = InterpreterUtil.findInputFromPort(node.getInputPort(0), this.invokerMap);
+ String instanceId = inputVal.toString();
+ /*
+ * Terminate instance
+ */
+ AmazonUtil.terminateInstances(instanceId);
+
+ // set color to done
+ node.setState(NodeExecutionState.FINISHED);
+ }
+
+ private void handleAmazonInstance(final Node node) {
+ if (AmazonCredential.getInstance().getAwsAccessKeyId().isEmpty() || AmazonCredential.getInstance().getAwsSecretAccessKey().isEmpty()) {
+ throw new WorkFlowInterpreterException("Please set Amazon Credential before using EC2 Instance Component");
+ }
+ for (ControlPort ports : node.getControlOutPorts()) {
+ ports.setConditionMet(true);
+ }
+ }
+
+ private void handleDoWhile(final Node node) {
+ // Executor thread is shutdown inside as thread gets killed when you
+ // shutdown
+ ExecutorService threadExecutor = Executors.newSingleThreadExecutor();
+ DoWhileHandler doWhileHandler = new DoWhileHandler((DoWhileNode) node, this.invokerMap, getWaitingNodesDynamically(),
+ getFinishedNodesDynamically(), this, threadExecutor);
+ threadExecutor.submit(doWhileHandler);
+ }
+
private void handleSubWorkComponent(Node node) throws WorkflowException {
notifyViaInteractor(WorkflowExecutionMessage.OPEN_SUBWORKFLOW, node);
- // if ((this.config.getMode() ==
- // WorkflowInterpreterConfiguration.GUI_MODE) && (node instanceof
- // SubWorkflowNodeGUI)) {
- // ((SubWorkflowNodeGUI)
- // NodeController.getGUI(node)).openWorkflowTab(this.config.getGUI());
- // }
// setting the inputs
Workflow subWorkflow = ((SubWorkflowNode) node).getWorkflow();
- // List<WSComponentPort> subWorkflowdInputs = new ODEClient()
- // .getInputs(subWorkflow);
ArrayList<Node> subWorkflowInputNodes = getInputNodes(subWorkflow);
List<DataPort> inputPorts = node.getInputPorts();
@@ -1092,26 +1056,6 @@ public class WorkflowInterpreter {
leadCtxHeader = XBayaUtil.buildLeadContextHeader(this.getWorkflow(), this.getConfig().getConfiguration(), new MonitorConfiguration(this
.getConfig().getConfiguration().getBrokerURL(), this.config.getTopic(), true, this.getConfig().getConfiguration()
.getMessageBoxURL()), foreachWSNode.getID(), null);
- // if (this.config.getMode() ==
- // WorkflowInterpreterConfiguration.GUI_MODE) {
- // leadCtxHeader = XBayaUtil.buildLeadContextHeader(
- // this.getWorkflow(),
- // this.getConfig().getConfiguration(),
- // new
- // MonitorConfiguration(this.getConfig().getConfiguration()
- // .getBrokerURL(), this.config.getTopic(), true,
- // this.getConfig().getConfiguration().getMessageBoxURL()),
- // foreachWSNode.getID(), null);
- // } else {
- // leadCtxHeader = XBayaUtil.buildLeadContextHeader(
- // this.getWorkflow(),
- // this.getConfig().getConfiguration(),
- // new
- // MonitorConfiguration(this.getConfig().getConfiguration()
- // .getBrokerURL(), this.config.getTopic(), true,
- // this.getConfig().getConfiguration().getMessageBoxURL()),
- // foreachWSNode.getID(), null);
- // }
} catch (URISyntaxException e) {
throw new WorkflowException(e);
}
@@ -1492,15 +1436,6 @@ public class WorkflowInterpreter {
}
notifyViaInteractor(WorkflowExecutionMessage.HANDLE_DEPENDENT_NODES_DIFFERED_INPUTS, this.getGraph());
- // if (this.config.getMode() ==
- // WorkflowInterpreterConfiguration.GUI_MODE) {
- // ArrayList<Node> waitingNodes =
- // InterpreterUtil.getWaitingNodesDynamically(this.getGraph());
- // for (Node readyNode : waitingNodes) {
- // DifferedInputHandler.handleDifferredInputsofDependentNodes(
- // readyNode, config.getGUI());
- // }
- // }
return list;