You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2011/12/18 18:53:00 UTC
svn commit: r1220494 -
/incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/graph/ws/WSGraph.java
Author: chathura
Date: Sun Dec 18 17:53:00 2011
New Revision: 1220494
URL: http://svn.apache.org/viewvc?rev=1220494&view=rev
Log:
AIRAVATA-231
Removing Stream functionalities from the XBaya system.
This removed CEPNODE, StreamJoinNode, MergeNode, CEPSubworkflowNode
https://issues.apache.org/jira/browse/AIRAVATA-231
Modified:
incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/graph/ws/WSGraph.java
Modified: incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/graph/ws/WSGraph.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/graph/ws/WSGraph.java?rev=1220494&r1=1220493&r2=1220494&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/graph/ws/WSGraph.java (original)
+++ incubator/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/graph/ws/WSGraph.java Sun Dec 18 17:53:00 2011
@@ -28,29 +28,16 @@ import java.util.List;
import org.apache.airavata.common.exception.UtilsException;
import org.apache.airavata.common.utils.XMLUtil;
-import org.apache.airavata.xbaya.XBayaConstants;
-import org.apache.airavata.xbaya.XBayaException;
-import org.apache.airavata.xbaya.XBayaRuntimeException;
-import org.apache.airavata.xbaya.component.dynamic.CepComponent;
-import org.apache.airavata.xbaya.component.system.InputComponent;
-import org.apache.airavata.xbaya.component.system.OutputComponent;
import org.apache.airavata.xbaya.graph.DataEdge;
import org.apache.airavata.xbaya.graph.DataPort;
import org.apache.airavata.xbaya.graph.GraphException;
import org.apache.airavata.xbaya.graph.GraphFactory;
import org.apache.airavata.xbaya.graph.GraphSchema;
import org.apache.airavata.xbaya.graph.Node;
-import org.apache.airavata.xbaya.graph.Port;
-import org.apache.airavata.xbaya.graph.dynamic.CepNode;
-import org.apache.airavata.xbaya.graph.dynamic.CombineMultipleStreamNode;
import org.apache.airavata.xbaya.graph.impl.EdgeImpl;
import org.apache.airavata.xbaya.graph.impl.GraphImpl;
-import org.apache.airavata.xbaya.graph.impl.NodeCloneFactory;
import org.apache.airavata.xbaya.graph.impl.NodeImpl;
-import org.apache.airavata.xbaya.graph.system.InputNode;
-import org.apache.airavata.xbaya.graph.system.gui.StreamSourceNode;
import org.apache.airavata.xbaya.graph.util.GraphUtil;
-import org.apache.airavata.xbaya.wf.Workflow;
import org.xmlpull.infoset.XmlElement;
public class WSGraph extends GraphImpl {
@@ -265,109 +252,10 @@ public class WSGraph extends GraphImpl {
return null;
}
- /**
- * @param subworkflowNodes
- * @throws GraphException
- */
- public void configureWithd(Workflow oldWorkflow, List<Node> subworkflowNodes) throws GraphException {
- HashMap<Node, Node> nodeMap = new HashMap<Node, Node>();
- for (Node node : subworkflowNodes) {
- nodeMap.put(node, node);
- }
-
- Workflow clone = oldWorkflow.clone();
- clone.setName(clone.getName() + "_subWorkflow");
- List<NodeImpl> allOldNodes = clone.getGraph().getNodes();
- for (NodeImpl nodeImpl : allOldNodes) {
- if (null == nodeMap.get(nodeImpl)) {
- clone.removeNode(nodeImpl);
- }
- }
-
- // clone nodes and set it to this graph
- for (Node node : subworkflowNodes) {
-
- nodeMap.put(node, NodeCloneFactory.clone(node, this));
-
- }
-
- for (Node node : subworkflowNodes) {
- Node newNode = nodeMap.get(node);
-
- List<DataPort> inputPorts = node.getInputPorts();
- for (DataPort dataPort : inputPorts) {
- DataPort newPort = newNode.getInputPort(dataPort.getID());
- Node fromNode = dataPort.getFromNode();
- Node newFromNode = nodeMap.get(fromNode);
- if (null == newFromNode) {
- // input to this subworkflow
- InputNode inputNode = new InputComponent().createNode(this);
- addEdge(inputNode.getOutputPort(0), newPort);
- } else {
- String fromPortID = dataPort.getFromPort().getID();
- addEdge(newFromNode.getOutputPort(fromPortID), newPort);
- }
- }
-
- List<DataPort> outputPorts = node.getOutputPorts();
- for (DataPort dataPort : outputPorts) {
- DataPort newPort = newNode.getOutputPort(dataPort.getID());
- List<Port> toNodes = dataPort.getToPorts();
- for (Port toPort : toNodes) {
- Node newToNode = nodeMap.get(toPort.getNode());
- if (null == newToNode) {
- // outputnode
- Node outputNode = new OutputComponent().createNode(this);
- addEdge(newPort, outputNode.getInputPort(0));
- } else {
- addEdge(newPort, newToNode.getInputPort(toPort.getID()));
- }
- }
- }
-
- }
-
- }
-
- public HashMap<String, LinkedList<Node>> labelIntroduceJoinsAndGetSubSets() throws XBayaException {
-
- parseGraph();
-
- labelGraph();
- HashMap<String, LinkedList<Node>> graphPartitionSets = getGraphPartitionSets();
- List<Node> joinRequiredNodes = GraphUtil.getJoinRequiredNodes(this);
- for (Node node : joinRequiredNodes) {
- CepNode newCEPJoinNode = (CepNode) new CepComponent().createNode(this);
- String newLabel = GraphUtil.introduceDynamicNode(node, newCEPJoinNode, this);
- LinkedList<Node> list = new LinkedList<Node>();
- list.add(newCEPJoinNode);
- graphPartitionSets.put(newLabel, list);
- }
-
- return graphPartitionSets;
- }
-
- /**
- * @throws XBayaException
- *
- */
- private void parseGraph() throws XBayaException {
- topologicalSort();
- LinkedList<CepNode> cepNodes = GraphUtil.getCEPNodes(this);
- for (CepNode node : cepNodes) {
- if (node instanceof CombineMultipleStreamNode) {
- if (((CombineMultipleStreamNode) node).getStreamName() == null) {
- throw new XBayaException("CombineMultipleStreamNode need to configure the queries:"
- + node.getName());
- }
- } else if (null == node.getQuery()) {
- throw new XBayaException("CepNode need to configure the queries:" + node.getName());
- }
- }
+
- }
/**
* @return
@@ -432,112 +320,5 @@ public class WSGraph extends GraphImpl {
return partiotionGraph;
}
- /**
- *
- */
- private void labelGraph() {
- LinkedList<Node> workQueue = new LinkedList<Node>();
-
- List<InputNode> inputNodes = GraphUtil.getInputNodes(this);
- for (InputNode inputNode : inputNodes) {
- inputNode.setLabel(XBayaConstants.STATIC_LABEL);
- List<DataPort> outputPorts = inputNode.getOutputPorts();
- for (DataPort dataPort : outputPorts) {
- workQueue.addAll(dataPort.getToNodes());
- List<DataEdge> edges = dataPort.getEdges();
- for (DataEdge dataEdge : edges) {
- dataEdge.setLabel(XBayaConstants.STATIC_LABEL);
- }
- }
- }
-
- LinkedList<StreamSourceNode> streamSourceNodes = GraphUtil.getStreamSourceNodes(this);
- for (StreamSourceNode input : streamSourceNodes) {
- String label = input.getlabel();
- List<DataPort> outputPorts = input.getOutputPorts();
- for (DataPort dataPort : outputPorts) {
- workQueue.addAll(dataPort.getToNodes());
- List<DataEdge> edges = dataPort.getEdges();
- for (DataEdge dataEdge : edges) {
- dataEdge.setLabel(label);
- }
- }
- }
-
- while (!workQueue.isEmpty()) {
- Node processingNode = workQueue.remove();
- if (!GraphUtil.isAllInputsConnected(processingNode)) {
- throw new XBayaRuntimeException("Disconnedted node inputs during labeling");
- }
- if (GraphUtil.isAllInputsLabeled(processingNode)) {
- String sameLabel = GraphUtil.isSameLabeledInput(processingNode);
- if (null != sameLabel) {
- // same label so its a regular node
-
- if (GraphUtil.isRegulerNode(processingNode)) {
- processingNode.setLabel(sameLabel);
- String nodeLabel = processingNode.getLabel();
-
- List<DataPort> outputPorts = processingNode.getOutputPorts();
- for (DataPort dataPort : outputPorts) {
- List<DataEdge> edges = dataPort.getEdges();
- for (DataEdge dataEdge : edges) {
- dataEdge.setLabel(nodeLabel);
- }
- workQueue.addAll(dataPort.getToNodes());
- }
- } else {
- processingNode.inventLabel(GraphUtil.getEncodedInputLabels(processingNode));
- String nodeOutLabel = processingNode.getLabel() + "_out";
-
- List<DataPort> outputPorts = processingNode.getOutputPorts();
- for (DataPort dataPort : outputPorts) {
- List<DataEdge> edges = dataPort.getEdges();
- for (DataEdge dataEdge : edges) {
- dataEdge.setLabel(nodeOutLabel);
- }
- workQueue.addAll(dataPort.getToNodes());
- }
- }
-
- } else {
- // may need to introduce a join
- // if its CEP node we assume it doesnt require
- // joins
- // Its hard to determine if the CEP node would
- // resolve the joins
- if (!GraphUtil.isCEPNode(processingNode)) {
- processingNode.setRequireJoin(true);
-
- }
- processingNode.inventLabel(GraphUtil.getEncodedInputLabels(processingNode));
- String nodeOutLabel = processingNode.getLabel() + "_out";
-
- List<DataPort> outputPorts = processingNode.getOutputPorts();
- for (DataPort dataPort : outputPorts) {
- List<DataEdge> edges = dataPort.getEdges();
- for (DataEdge dataEdge : edges) {
- dataEdge.setLabel(nodeOutLabel);
- }
- workQueue.addAll(dataPort.getToNodes());
- }
-
- }
- } else {
- workQueue.add(processingNode);
- }
- }
-
- // if work queue is empty yet all are not labeled graph is not connected
- // !!
- List<EdgeImpl> edges = this.getEdges();
- for (EdgeImpl edge : edges) {
- if (null == edge.getLabel()) {
- throw new XBayaRuntimeException(
- "Some edges were not labeled after labeling algorithm. Possibly partitioned graph");
- }
- }
-
- }
}
\ No newline at end of file