You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/01/09 17:49:14 UTC
svn commit: r1430956 - in /airavata/trunk/modules:
gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/
gfac-core/src/main/java/org/apache/airavata/core/gfac/
xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/
Author: lahiru
Date: Wed Jan 9 16:49:13 2013
New Revision: 1430956
URL: http://svn.apache.org/viewvc?rev=1430956&view=rev
Log:
fixing EmbeddedInovoker.
Modified:
airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java
airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/GfacAPI.java
airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java
Modified: airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java?rev=1430956&r1=1430955&r2=1430956&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java (original)
+++ airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java Wed Jan 9 16:49:13 2013
@@ -203,7 +203,9 @@ public class GFacMessageReciever impleme
try {
gfacAPI = new GfacAPI();
- invocationContext = gfacAPI.gridJobSubmit(jobContext, (GFacConfiguration) context.getProperty(GFacService.GFAC_CONFIGURATION));
+ String workflowNodeId = WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getWorkflowNodeId();
+ String workflowInstanceId = WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getWorkflowInstanceId();
+ invocationContext = gfacAPI.gridJobSubmit(jobContext, (GFacConfiguration) context.getProperty(GFacService.GFAC_CONFIGURATION),workflowNodeId,workflowInstanceId);
/*
* Add notifiable object
*/
Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/GfacAPI.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/GfacAPI.java?rev=1430956&r1=1430955&r2=1430956&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/GfacAPI.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/GfacAPI.java Wed Jan 9 16:49:13 2013
@@ -46,9 +46,7 @@ public class GfacAPI {
"." + WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getExperimentId());
public static final String REPOSITORY_PROPERTIES = "airavata-server.properties";
- public DefaultInvocationContext gridJobSubmit(JobContext jobContext,GFacConfiguration gfacConfig) throws Exception {
- String workflowNodeId = WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getWorkflowNodeId();
- String workflowInstanceId = WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getWorkflowInstanceId();
+ public DefaultInvocationContext gridJobSubmit(JobContext jobContext,GFacConfiguration gfacConfig,String workflowNodeId,String workflowInstanceId) throws Exception {
WorkflowTrackingNotification workflowNotification = new WorkflowTrackingNotification(jobContext.getBrokerURL(),
jobContext.getTopic(),workflowNodeId,workflowInstanceId);
LoggingNotification loggingNotification = new LoggingNotification();
Modified: airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java?rev=1430956&r1=1430955&r2=1430956&view=diff
==============================================================================
--- airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java (original)
+++ airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java Wed Jan 9 16:49:13 2013
@@ -40,6 +40,7 @@ import javax.xml.stream.XMLStreamExcepti
import javax.xml.stream.XMLStreamReader;
import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.common.workflow.execution.context.WorkflowContextHeaderBuilder;
import org.apache.airavata.registry.api.exception.RegistryException;
import org.apache.airavata.common.utils.XMLUtil;
import org.apache.airavata.commons.gfac.type.ActualParameter;
@@ -63,6 +64,7 @@ import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMFactory;
import org.apache.axiom.om.OMNamespace;
import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmlpull.v1.builder.XmlElement;
@@ -72,9 +74,9 @@ import xsul.wsif.WSIFMessage;
import xsul.wsif.impl.WSIFMessageElement;
import xsul.xwsif_runtime.WSIFClient;
-public class EmbeddedGFacInvoker implements Invoker{
+public class EmbeddedGFacInvoker implements Invoker {
- private static final Logger logger = LoggerFactory.getLogger(EmbeddedGFacInvoker.class);
+ private static final Logger logger = LoggerFactory.getLogger(EmbeddedGFacInvoker.class);
private String nodeID;
@@ -120,19 +122,15 @@ public class EmbeddedGFacInvoker impleme
private Object outPut;
- Map<Parameter,ActualParameter> actualParameters = new HashMap<Parameter,ActualParameter>();
+ Map<Parameter, ActualParameter> actualParameters = new HashMap<Parameter, ActualParameter>();
/**
* Creates an InvokerWithNotification.
*
* @param portTypeQName
- *
- * @param wsdlLocation
- * The URL of WSDL of the service to invoke
- * @param nodeID
- * The ID of the service
- * @param notifier
- * The notification sender
+ * @param wsdlLocation The URL of WSDL of the service to invoke
+ * @param nodeID The ID of the service
+ * @param notifier The notification sender
*/
public EmbeddedGFacInvoker(QName portTypeQName, String wsdlLocation, String nodeID, WorkflowNotifiable notifier) {
this(portTypeQName, wsdlLocation, nodeID, null, notifier);
@@ -142,18 +140,13 @@ public class EmbeddedGFacInvoker impleme
* Creates an InvokerWithNotification.
*
* @param portTypeQName
- *
- * @param wsdlLocation
- * The URL of WSDL of the service to invoke
- * @param nodeID
- * The ID of the service
- * @param gfacURL
- * The URL of GFac service.
- * @param notifier
- * The notification sender
+ * @param wsdlLocation The URL of WSDL of the service to invoke
+ * @param nodeID The ID of the service
+ * @param gfacURL The URL of GFac service.
+ * @param notifier The notification sender
*/
public EmbeddedGFacInvoker(QName portTypeQName, String wsdlLocation, String nodeID, String gfacURL,
- WorkflowNotifiable notifier) {
+ WorkflowNotifiable notifier) {
this(portTypeQName, wsdlLocation, nodeID, null, gfacURL, notifier);
}
@@ -161,19 +154,14 @@ public class EmbeddedGFacInvoker impleme
* Creates an InvokerWithNotification.
*
* @param portTypeQName
- *
- * @param wsdlLocation
- * The URL of WSDL of the service to invoke
- * @param nodeID
- * The ID of the service
+ * @param wsdlLocation The URL of WSDL of the service to invoke
+ * @param nodeID The ID of the service
* @param messageBoxURL
- * @param gfacURL
- * The URL of GFac service.
- * @param notifier
- * The notification sender
+ * @param gfacURL The URL of GFac service.
+ * @param notifier The notification sender
*/
public EmbeddedGFacInvoker(QName portTypeQName, String wsdlLocation, String nodeID, String messageBoxURL,
- String gfacURL, WorkflowNotifiable notifier) {
+ String gfacURL, WorkflowNotifiable notifier) {
this.nodeID = nodeID;
this.portTypeQName = portTypeQName;
this.wsdlLocation = wsdlLocation;
@@ -186,7 +174,6 @@ public class EmbeddedGFacInvoker impleme
}
/**
- *
* @param portTypeQName
* @param wsdl
* @param nodeID
@@ -220,7 +207,6 @@ public class EmbeddedGFacInvoker impleme
}
/**
- *
* @throws WorkflowException
*/
public void setup() throws WorkflowException {
@@ -231,20 +217,15 @@ public class EmbeddedGFacInvoker impleme
}
/**
- *
- * @param operationName
- * The name of the operation
+ * @param operationName The name of the operation
* @throws WorkflowException
*/
public void setOperation(String operationName) throws WorkflowException {
}
/**
- *
- * @param name
- * The name of the input parameter
- * @param value
- * The value of the input parameter
+ * @param name The name of the input parameter
+ * @param value The value of the input parameter
* @throws WorkflowException
*/
public void setInput(String name, Object value) throws WorkflowException {
@@ -255,16 +236,20 @@ public class EmbeddedGFacInvoker impleme
this.inputNames.add(name);
this.inputValues.add(value);
ServiceDescription serviceDescription = airavataAPI.getApplicationManager().getServiceDescription(this.serviceName);
- if(serviceDescription==null){
- throw new RegistryException(new Exception("Service Description not found in registry."));
+ if (serviceDescription == null) {
+ throw new RegistryException(new Exception("Service Description not found in registry."));
}
ServiceDescriptionType serviceDescriptionType = serviceDescription.getType();
for (Parameter parameter : serviceDescriptionType.getInputParametersArray()) {
//todo this implementation doesn't work when there are n number of nodes connecting .. need to fix
- XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(XMLUtil.xmlElementToString((XmlElement) value)));
- StAXOMBuilder builder = new StAXOMBuilder(reader);
- OMElement input = builder.getDocumentElement();
- actualParameters.put(parameter, GfacUtils.getInputActualParameter(parameter, input));
+ if (value instanceof XmlElement) {
+ XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(XMLUtil.xmlElementToString((XmlElement) value)));
+ StAXOMBuilder builder = new StAXOMBuilder(reader);
+ OMElement input = builder.getDocumentElement();
+ actualParameters.put(parameter, GfacUtils.getInputActualParameter(parameter, input));
+ } else if (value instanceof String) {
+ actualParameters.put(parameter, GfacUtils.getInputActualParameter(parameter, AXIOMUtil.stringToOM("<value>" + value + "</value>")));
+ }
}
} catch (RuntimeException e) {
logger.error(e.getMessage(), e);
@@ -284,98 +269,67 @@ public class EmbeddedGFacInvoker impleme
}
/**
- *
* @return
* @throws WorkflowException
*/
public synchronized boolean invoke() throws WorkflowException {
try {
- ExecutorService executor = Executors.newSingleThreadExecutor();
- this.result = executor.submit(new Callable<Boolean>() {
- @SuppressWarnings("boxing")
- public Boolean call() {
- try {
- JobContext jobContext = new JobContext(actualParameters,EmbeddedGFacInvoker.this.topic,
- EmbeddedGFacInvoker.this.serviceName,EmbeddedGFacInvoker.this.gfacURL);
- GFacConfiguration gFacConfiguration = new GFacConfiguration(EmbeddedGFacInvoker.this.configuration.getMyProxyServer(),
- EmbeddedGFacInvoker.this.configuration.getMyProxyUsername(),
- EmbeddedGFacInvoker.this.configuration.getMyProxyPassphrase(),EmbeddedGFacInvoker.this.configuration.getMyProxyLifetime(),
- EmbeddedGFacInvoker.this.airavataAPI, EmbeddedGFacInvoker.this.configuration.getTrustedCertLocation());
-
- GfacAPI gfacAPI1 = new GfacAPI();
- InvocationContext defaultInvocationContext = gfacAPI1.gridJobSubmit(jobContext, gFacConfiguration);
- ParameterContextImpl outputParamContext = (ParameterContextImpl) defaultInvocationContext
- .<ActualParameter>getMessageContext("output");
- if (outputParamContext.getNames().hasNext()) {
- /*
- * Process Output
- */
- OMFactory fac = OMAbstractFactory.getOMFactory();
- OMNamespace omNs = fac.createOMNamespace("http://ws.apache.org/axis2/xsd", "ns1");
- OMElement outputElement = fac.createOMElement("invokeResponse", omNs);
-
- for (Iterator<String> iterator = outputParamContext.getNames(); iterator.hasNext(); ) {
- String name = iterator.next();
- String outputString = outputParamContext.getValue(name).toXML().replaceAll("GFacParameter", name);
- XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(outputString));
- StAXOMBuilder builder = new StAXOMBuilder(reader);
- outputElement.addChild(builder.getDocumentElement());
- }
- // Send notification
- logger.info("outputMessage: " + outputElement.toString());
- outPut = new WSIFMessageElement(XMLUtil.stringToXmlElement3(outputElement.toStringWithConsume()));
- EmbeddedGFacInvoker.this.notifier.serviceFinished(new WSIFMessageElement((XmlElement)outPut));
- } else {
- // An implementation of WSIFMessage,
- // WSIFMessageElement, implements toString(), which
- // serialize the message XML.
- EmbeddedGFacInvoker.this.notifier.receivedFault(new WSIFMessageElement(XMLUtil.stringToXmlElement3("<Message>Invocation Failed</Message>")));
- EmbeddedGFacInvoker.this.failerSent = true;
- }
- return true;
- } catch (WorkflowException e) {
- logger.error(e.getMessage(), e);
- // An appropriate message has been set in the exception.
- EmbeddedGFacInvoker.this.notifier.invocationFailed(e.getMessage(), e);
- EmbeddedGFacInvoker.this.failerSent = true;
- throw new WorkflowRuntimeException(e);
- } catch (RuntimeException e) {
- logger.error(e.getMessage(), e);
- String message = "Error in invoking a service: " + EmbeddedGFacInvoker.this.serviceInformation;
- EmbeddedGFacInvoker.this.notifier.invocationFailed(message, e);
- EmbeddedGFacInvoker.this.failerSent = true;
- throw e;
- } catch (XMLStreamException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- } catch (Exception e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- return false;
+ JobContext jobContext = new JobContext(actualParameters, EmbeddedGFacInvoker.this.topic,
+ EmbeddedGFacInvoker.this.serviceName, EmbeddedGFacInvoker.this.gfacURL);
+ GFacConfiguration gFacConfiguration = new GFacConfiguration(EmbeddedGFacInvoker.this.configuration.getMyProxyServer(),
+ EmbeddedGFacInvoker.this.configuration.getMyProxyUsername(),
+ EmbeddedGFacInvoker.this.configuration.getMyProxyPassphrase(), EmbeddedGFacInvoker.this.configuration.getMyProxyLifetime(),
+ EmbeddedGFacInvoker.this.airavataAPI, EmbeddedGFacInvoker.this.configuration.getTrustedCertLocation());
+
+ GfacAPI gfacAPI1 = new GfacAPI();
+ InvocationContext defaultInvocationContext = gfacAPI1.gridJobSubmit(jobContext,
+ gFacConfiguration,this.nodeID,this.notifier.getWorkflowID().toASCIIString());
+ ParameterContextImpl outputParamContext = (ParameterContextImpl) defaultInvocationContext
+ .<ActualParameter>getMessageContext("output");
+ if (outputParamContext.getNames().hasNext()) {
+ /*
+ * Process Output
+ */
+ OMFactory fac = OMAbstractFactory.getOMFactory();
+ OMNamespace omNs = fac.createOMNamespace("http://ws.apache.org/axis2/xsd", "ns1");
+ OMElement outputElement = fac.createOMElement("invokeResponse", omNs);
+
+ for (Iterator<String> iterator = outputParamContext.getNames(); iterator.hasNext(); ) {
+ String name = iterator.next();
+ String outputString = outputParamContext.getValue(name).toXML().replaceAll("GFacParameter", name);
+ XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(outputString));
+ StAXOMBuilder builder = new StAXOMBuilder(reader);
+ outputElement.addChild(builder.getDocumentElement());
}
- });
- // Kill the thread inside of executor. This is necessary for Jython
- // script to finish.
-// executor.shutdown();
+ // Send notification
+ logger.info("outputMessage: " + outputElement.toString());
+ outPut = new WSIFMessageElement(XMLUtil.stringToXmlElement3(outputElement.toStringWithConsume()));
+ EmbeddedGFacInvoker.this.notifier.serviceFinished(new WSIFMessageElement((XmlElement) outPut));
+ } else {
+ // An implementation of WSIFMessage,
+ // WSIFMessageElement, implements toString(), which
+ // serialize the message XML.
+ EmbeddedGFacInvoker.this.notifier.receivedFault(new WSIFMessageElement(XMLUtil.stringToXmlElement3("<Message>Invocation Failed</Message>")));
+ EmbeddedGFacInvoker.this.failerSent = true;
+ }
- // Let other threads know that job has been submitted.
- notifyAll();
// Check if the invocation itself fails. This happens immediately.
- try {
- this.result.get(100, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- } catch (TimeoutException e) {
- // The job is probably running fine.
- // The normal case.
- return true;
- } catch (ExecutionException e) {
- // The service-failed notification should have been sent
- // already.
- logger.error(e.getMessage(), e);
- String message = "Error in invoking a service: " + this.serviceInformation;
- throw new WorkflowException(message, e);
- }
+// try {
+// this.result.get(100, TimeUnit.MILLISECONDS);
+// } catch (InterruptedException e) {
+// logger.error(e.getMessage(), e);
+// } catch (TimeoutException e) {
+// // The job is probably running fine.
+// // The normal case.
+// return true;
+// } catch (ExecutionException e) {
+// // The service-failed notification should have been sent
+// // already.
+// logger.error(e.getMessage(), e);
+// String message = "Error in invoking a service: " + this.serviceInformation;
+// throw new WorkflowException(message, e);
+// }
} catch (RuntimeException e) {
logger.error(e.getMessage(), e);
String message = "Error in invoking a service: " + this.serviceInformation;
@@ -393,7 +347,6 @@ public class EmbeddedGFacInvoker impleme
}
/**
- *
* @throws WorkflowException
*/
@SuppressWarnings("boxing")
@@ -439,16 +392,14 @@ public class EmbeddedGFacInvoker impleme
}
/**
- *
- * @param name
- * The name of the output parameter
+ * @param name The name of the output parameter
* @return
* @throws WorkflowException
*/
public Object getOutput(String name) throws WorkflowException {
try {
waitToFinish();
- return outPut;
+ return outPut;
} catch (WorkflowException e) {
logger.error(e.getMessage(), e);
// An appropriate message has been set in the exception.
@@ -470,7 +421,6 @@ public class EmbeddedGFacInvoker impleme
}
/**
- *
* @return
* @throws WorkflowException
*/