You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:31:18 UTC
[76/90] [abbrv] AIRAVATA-1124
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java
new file mode 100644
index 0000000..267b838
--- /dev/null
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java
@@ -0,0 +1,686 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) und= nuer one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.xbaya.invoker;
+
+import java.io.File;
+import java.io.StringReader;
+import java.net.URL;
+import java.util.*;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.XMLUtil;
+import org.apache.airavata.common.workflow.execution.context.WorkflowContextHeaderBuilder;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.commons.gfac.type.ServiceDescription;
+import org.apache.airavata.credential.store.store.CredentialReaderFactory;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.RequestData;
+import org.apache.airavata.gfac.context.ApplicationContext;
+import org.apache.airavata.gfac.context.JobExecutionContext;
+import org.apache.airavata.gfac.context.MessageContext;
+import org.apache.airavata.gfac.context.security.GSISecurityContext;
+import org.apache.airavata.gfac.context.security.SSHSecurityContext;
+import org.apache.airavata.gfac.cpi.GFacImpl;
+import org.apache.airavata.gfac.ec2.AmazonSecurityContext;
+import org.apache.airavata.gfac.scheduler.HostScheduler;
+import org.apache.airavata.gfac.utils.GFacUtils;
+import org.apache.airavata.gsi.ssh.api.Cluster;
+import org.apache.airavata.gsi.ssh.api.SSHApiException;
+import org.apache.airavata.gsi.ssh.api.ServerInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
+import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.PBSCluster;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPasswordAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.impl.authentication.DefaultPublicKeyFileAuthentication;
+import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
+import org.apache.airavata.gsi.ssh.util.CommonUtils;
+import org.apache.airavata.registry.api.exception.RegistryException;
+import org.apache.airavata.schemas.gfac.*;
+import org.apache.airavata.schemas.wec.ContextHeaderDocument;
+import org.apache.airavata.schemas.wec.SecurityContextDocument;
+import org.apache.airavata.workflow.model.exceptions.WorkflowException;
+import org.apache.airavata.xbaya.XBayaConfiguration;
+import org.apache.airavata.xbaya.jython.lib.ServiceNotifiable;
+import org.apache.airavata.xbaya.jython.lib.WorkflowNotifiable;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmlpull.v1.builder.XmlElement;
+
+import xsul.wsdl.WsdlDefinitions;
+import xsul.wsif.WSIFMessage;
+import xsul.wsif.impl.WSIFMessageElement;
+import xsul.xwsif_runtime.WSIFClient;
+
+public class EmbeddedGFacInvoker implements Invoker {
+
+ private static final Logger logger = LoggerFactory.getLogger(EmbeddedGFacInvoker.class);
+
+ private String nodeID;
+
+ private QName portTypeQName;
+
+ private String wsdlLocation;
+
+ private String serviceInformation;
+
+ private String messageBoxURL;
+
+ private String gfacURL;
+
+ private Invoker invoker;
+
+ private XBayaConfiguration configuration;
+
+
+ private Boolean result;
+
+ private ServiceNotifiable notifier;
+
+// private AiravataRegistry2 registry;
+
+ private String topic;
+
+ private String serviceName;
+
+ private AiravataAPI airavataAPI;
+ /**
+ * used for notification
+ */
+ private List<Object> inputValues = new ArrayList<Object>();
+
+ /**
+ * used for notification
+ */
+ private List<String> inputNames = new ArrayList<String>();
+
+ boolean failerSent;
+
+ private WsdlDefinitions wsdlDefinitionObject;
+
+ private Object outPut;
+
+ Map<String, Object> actualParameters = new LinkedHashMap<String, Object>();
+
+ /**
+ * Creates an InvokerWithNotification.
+ *
+ * @param portTypeQName
+ * @param wsdlLocation The URL of WSDL of the service to invoke
+ * @param nodeID The ID of the service
+ * @param notifier The notification sender
+ */
+ public EmbeddedGFacInvoker(QName portTypeQName, String wsdlLocation, String nodeID, WorkflowNotifiable notifier) {
+ this(portTypeQName, wsdlLocation, nodeID, null, notifier);
+ }
+
+ /**
+ * Creates an InvokerWithNotification.
+ *
+ * @param portTypeQName
+ * @param wsdlLocation The URL of WSDL of the service to invoke
+ * @param nodeID The ID of the service
+ * @param gfacURL The URL of GFac service.
+ * @param notifier The notification sender
+ */
+ public EmbeddedGFacInvoker(QName portTypeQName, String wsdlLocation, String nodeID, String gfacURL,
+ WorkflowNotifiable notifier) {
+ this(portTypeQName, wsdlLocation, nodeID, null, gfacURL, notifier);
+ }
+
+ /**
+ * Creates an InvokerWithNotification.
+ *
+ * @param portTypeQName
+ * @param wsdlLocation The URL of WSDL of the service to invoke
+ * @param nodeID The ID of the service
+ * @param messageBoxURL
+ * @param gfacURL The URL of GFac service.
+ * @param notifier The notification sender
+ */
+ public EmbeddedGFacInvoker(QName portTypeQName, String wsdlLocation, String nodeID, String messageBoxURL,
+ String gfacURL, WorkflowNotifiable notifier) {
+ this.nodeID = nodeID;
+ this.portTypeQName = portTypeQName;
+ this.wsdlLocation = wsdlLocation;
+ this.serviceInformation = wsdlLocation;
+ this.messageBoxURL = messageBoxURL;
+ this.gfacURL = gfacURL;
+ this.notifier = notifier.createServiceNotificationSender(nodeID);
+
+ this.failerSent = false;
+ }
+
+ /**
+ * @param portTypeQName
+ * @param wsdl
+ * @param nodeID
+ * @param messageBoxURL
+ * @param gfacURL
+ * @param notifier
+ */
+ public EmbeddedGFacInvoker(QName portTypeQName,
+ WsdlDefinitions wsdl,
+ String nodeID,
+ String messageBoxURL,
+ String gfacURL,
+ WorkflowNotifiable notifier,
+ String topic,
+ AiravataAPI airavataAPI,
+ String serviceName,
+ XBayaConfiguration config) {
+ final String wsdlStr = xsul.XmlConstants.BUILDER.serializeToString(wsdl);
+ this.nodeID = nodeID;
+ this.portTypeQName = portTypeQName;
+ this.wsdlDefinitionObject = wsdl;
+ this.messageBoxURL = messageBoxURL;
+ this.serviceInformation = wsdlStr;
+ this.gfacURL = gfacURL;
+ this.notifier = notifier.createServiceNotificationSender(nodeID);
+ this.airavataAPI = airavataAPI;
+ this.topic = topic;
+ this.serviceName = serviceName;
+ this.failerSent = false;
+ this.configuration = config;
+ }
+
+ /**
+ * @throws WorkflowException
+ */
+ public void setup() throws WorkflowException {
+ this.notifier.setServiceID(this.nodeID);
+ }
+
+ private void setup(WsdlDefinitions definitions) throws WorkflowException {
+ }
+
+ /**
+ * @param operationName The name of the operation
+ * @throws WorkflowException
+ */
+ public void setOperation(String operationName) throws WorkflowException {
+ }
+
+ /**
+ * @param name The name of the input parameter
+ * @param value The value of the input parameter
+ * @throws WorkflowException
+ */
+ public void setInput(String name, Object value) throws WorkflowException {
+ try {
+ if (value instanceof XmlElement) {
+ logger.debug("value: " + XMLUtil.xmlElementToString((XmlElement) value));
+ }
+ this.inputNames.add(name);
+ this.inputValues.add(value);
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error in setting an input. name: " + name + " value: " + value;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Error e) {
+ logger.error(e.getMessage(), e);
+ String message = "Unexpected error: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Exception e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+ /**
+ * @return
+ * @throws WorkflowException
+ */
+ public synchronized boolean invoke() throws WorkflowException {
+ try {
+ ContextHeaderDocument.ContextHeader contextHeader =
+ WorkflowContextHeaderBuilder.removeOtherSchedulingConfig(nodeID, this.configuration.getContextHeader());
+ String hostName = null;
+ HostDescription registeredHost;
+ if (contextHeader != null) {
+ if (contextHeader.getWorkflowSchedulingContext() != null &&
+ contextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray().length > 0 &&
+ contextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray(0).getHostName() != null) {
+ hostName = contextHeader.getWorkflowSchedulingContext().getApplicationSchedulingContextArray(0).getHostName();
+ }
+ }
+ //todo This is the basic scheduling, have to do proper scheduling implementation by implementing HostScheduler interface
+ ServiceDescription serviceDescription = airavataAPI.getApplicationManager().getServiceDescription(serviceName);
+ if (hostName == null) {
+ List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
+ Map<String, ApplicationDescription> applicationDescriptors = airavataAPI.getApplicationManager().getApplicationDescriptors(serviceName);
+ for (String hostDescName : applicationDescriptors.keySet()) {
+ registeredHosts.add(airavataAPI.getApplicationManager().getHostDescription(hostDescName));
+ }
+ Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
+ HostScheduler hostScheduler = aClass.newInstance();
+ registeredHost = hostScheduler.schedule(registeredHosts);
+ } else {
+ // if user specify a host, no matter what we pick that host for all the nodes, todo: allow users to specify node specific host
+ registeredHost = airavataAPI.getApplicationManager().getHostDescription(hostName);
+ }
+ ApplicationDescription applicationDescription =
+ airavataAPI.getApplicationManager().getApplicationDescription(serviceName, registeredHost.getType().getHostName());
+
+ // When we run getInParameters we set the actualParameter object, this has to be fixed
+ URL resource = EmbeddedGFacInvoker.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+ OMElement inputMessage = getInParameters();
+ Object wsifMessageElement = new WSIFMessageElement(XMLUtil.stringToXmlElement3(inputMessage.toStringWithConsume()));
+ this.notifier.invokingService(new WSIFMessageElement((XmlElement) wsifMessageElement));
+ Properties configurationProperties = ServerSettings.getProperties();
+ GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
+
+ JobExecutionContext jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
+ //Here we get only the contextheader information sent specific for this node
+ //Add security context
+
+ //FIXME - We no longer using job execution context
+// jobExecutionContext.setContextHeader(WorkflowContextHeaderBuilder.removeOtherSchedulingConfig(nodeID, configuration.getContextHeader()));
+
+
+ jobExecutionContext.setProperty(Constants.PROP_WORKFLOW_NODE_ID, this.nodeID);
+ jobExecutionContext.setProperty(Constants.PROP_TOPIC, this.configuration.getTopic());
+ jobExecutionContext.setProperty(Constants.PROP_BROKER_URL, this.configuration.getBrokerURL().toASCIIString());
+ jobExecutionContext.setProperty(Constants.PROP_WORKFLOW_INSTANCE_ID, this.configuration.getTopic());
+
+
+ ApplicationContext applicationContext = new ApplicationContext();
+ applicationContext.setApplicationDeploymentDescription(applicationDescription);
+ applicationContext.setHostDescription(registeredHost);
+ applicationContext.setServiceDescription(serviceDescription);
+
+ jobExecutionContext.setApplicationContext(applicationContext);
+
+ jobExecutionContext.setOutMessageContext(getOutParameters(serviceDescription));
+ jobExecutionContext.setInMessageContext(new MessageContext(actualParameters));
+
+ addSecurityContext(registeredHost, configurationProperties, jobExecutionContext,
+ configuration.getContextHeader());
+ GFacImpl gfacAPI1 = new GFacImpl();
+ gfacAPI1.submitJob(jobExecutionContext);
+
+ OMFactory fac = OMAbstractFactory.getOMFactory();
+ OMNamespace omNs = fac.createOMNamespace("http://ws.apache.org/axis2/xsd", "ns1");
+ OMElement outputElement = fac.createOMElement("invokeResponse", omNs);
+ MessageContext outMessageContext = jobExecutionContext.getOutMessageContext();
+ Set<String> paramNames = outMessageContext.getParameters().keySet();
+ for (String paramName : paramNames) {
+ /*
+ * Process Output
+ */
+ String outputString = ((ActualParameter) outMessageContext.getParameter(paramName)).toXML().replaceAll("GFacParameter", paramName);
+ XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(outputString));
+ StAXOMBuilder builder = new StAXOMBuilder(reader);
+ outputElement.addChild(builder.getDocumentElement());
+ }
+ // Send notification
+ logger.debug("outputMessage: " + outputElement.toString());
+ outPut = new WSIFMessageElement(XMLUtil.stringToXmlElement3(outputElement.toStringWithConsume()));
+ this.result = true;
+ EmbeddedGFacInvoker.this.notifier.serviceFinished(new WSIFMessageElement((XmlElement) outPut));
+ //todo check whether ActualParameter values are set or not, if they are null have to through an error or handle this in gfac level.
+// {
+// // An implementation of WSIFMessage,
+// // WSIFMessageElement, implements toString(), which
+// // serialize the message XML.
+// EmbeddedGFacInvoker.this.notifier.receivedFault(new WSIFMessageElement(XMLUtil.stringToXmlElement3("<Message>Invocation Failed</Message>")));
+// EmbeddedGFacInvoker.this.failerSent = true;
+// }
+
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error in invoking a service: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Error e) {
+ logger.error(e.getMessage(), e);
+ String message = "Unexpected error: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Exception e) {
+ this.notifier.invocationFailed(e.getMessage(), e);
+ throw new WorkflowException(e.getMessage(), e);
+ }
+ return true;
+ }
+
+ private SecurityContextDocument.SecurityContext.CredentialManagementService getCredentialManagementService(
+ ContextHeaderDocument.ContextHeader contextHeader) {
+
+ if (contextHeader != null) {
+
+ SecurityContextDocument.SecurityContext.CredentialManagementService credentialManagementService
+ = contextHeader.getSecurityContext().getCredentialManagementService();
+
+ if (credentialManagementService != null) {
+ // Make sure token id and portal user id is properly populated
+ if (credentialManagementService.getTokenId() != null &&
+ credentialManagementService.getPortalUser() != null) {
+
+ return credentialManagementService;
+ } else {
+ return null;
+ }
+ } else {
+ return null;
+ }
+ }
+
+ return null;
+ }
+
+ private void addSecurityContext(HostDescription registeredHost, Properties configurationProperties,
+ JobExecutionContext jobExecutionContext, ContextHeaderDocument.ContextHeader contextHeader) throws WorkflowException {
+ RequestData requestData;
+ /* todo fix the credential store and uncomment following code block
+ SecurityContextDocument.SecurityContext.CredentialManagementService credentialManagementService
+ = getCredentialManagementService(contextHeader);
+
+ GSISecurityContext context;
+
+
+ if (credentialManagementService != null) {
+ String gatewayId = credentialManagementService.getGatewayId();
+ String tokenId
+ = credentialManagementService.getTokenId();
+ String portalUser = credentialManagementService.getPortalUser();
+
+ requestData = new RequestData(tokenId, portalUser, gatewayId);
+ } else {
+ requestData = new RequestData("default");
+ }
+
+ try {
+ context = new GSISecurityContext(CredentialReaderFactory.createCredentialStoreReader(), requestData);
+ } catch (Exception e) {
+ throw new WorkflowException("An error occurred while creating GSI security context", e);
+ }
+ if (registeredHost.getType() instanceof GsisshHostType) {
+ GSIAuthenticationInfo authenticationInfo
+ = new MyProxyAuthenticationInfo(requestData.getMyProxyUserName(), requestData.getMyProxyPassword(), requestData.getMyProxyServerUrl(),
+ requestData.getMyProxyPort(), requestData.getMyProxyLifeTime(), System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY));
+ ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), registeredHost.getType().getHostAddress());
+
+ Cluster pbsCluster = null;
+ try {
+ pbsCluster = new PBSCluster(serverInfo, authenticationInfo,
+ (((HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath()));
+ } catch (SSHApiException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ context.setPbsCluster(pbsCluster);
+ }
+ */
+ requestData = new RequestData("default");
+ GSISecurityContext context;
+ try {
+ context = new GSISecurityContext(CredentialReaderFactory.createCredentialStoreReader(), requestData);
+ } catch (Exception e) {
+ throw new WorkflowException("An error occurred while creating GSI security context", e);
+ }
+
+ if (registeredHost.getType() instanceof GsisshHostType) {
+ GSIAuthenticationInfo authenticationInfo
+ = new MyProxyAuthenticationInfo(requestData.getMyProxyUserName(), requestData.getMyProxyPassword(), requestData.getMyProxyServerUrl(),
+ requestData.getMyProxyPort(), requestData.getMyProxyLifeTime(), System.getProperty(Constants.TRUSTED_CERTIFICATE_SYSTEM_PROPERTY));
+ ServerInfo serverInfo = new ServerInfo(requestData.getMyProxyUserName(), registeredHost.getType().getHostAddress());
+
+ Cluster pbsCluster = null;
+ try {
+ String installedParentPath = ((HpcApplicationDeploymentType)
+ jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
+ pbsCluster = new PBSCluster(serverInfo, authenticationInfo,
+ (CommonUtils.getPBSJobManager(installedParentPath)));
+ } catch (SSHApiException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ context.setPbsCluster(pbsCluster);
+ }
+
+ jobExecutionContext.addSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT, context);
+ //Adding Amanzon Keys
+ if (this.configuration.getAmazonSecurityContext() != null) {
+ jobExecutionContext.addSecurityContext(AmazonSecurityContext.AMAZON_SECURITY_CONTEXT,
+ this.configuration.getAmazonSecurityContext());
+ }
+ //Adding SSH security
+ String sshUserName = configurationProperties.getProperty(Constants.SSH_USER_NAME);
+ String sshPrivateKey = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY);
+ String sshPrivateKeyPass = configurationProperties.getProperty(Constants.SSH_PRIVATE_KEY_PASS);
+ String sshPassword = configurationProperties.getProperty(Constants.SSH_PASSWORD);
+ String sshPublicKey = configurationProperties.getProperty(Constants.SSH_PUBLIC_KEY);
+ SSHSecurityContext sshSecurityContext = new SSHSecurityContext();
+ if (((SSHHostType) registeredHost.getType()).getHpcResource()) {
+ AuthenticationInfo authenticationInfo = null;
+ // we give higher preference to the password over keypair ssh authentication
+ if (sshPassword != null) {
+ authenticationInfo = new DefaultPasswordAuthenticationInfo(sshPassword);
+ } else {
+ authenticationInfo = new DefaultPublicKeyFileAuthentication(sshPublicKey, sshPrivateKey, sshPrivateKeyPass);
+ }
+ ServerInfo serverInfo = new ServerInfo(sshUserName, registeredHost.getType().getHostAddress());
+
+ Cluster pbsCluster = null;
+ try {
+ String installedParentPath = ((HpcApplicationDeploymentType)
+ jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType()).getInstalledParentPath();
+ pbsCluster = new PBSCluster(serverInfo, authenticationInfo,
+ (CommonUtils.getPBSJobManager(installedParentPath)));
+ } catch (SSHApiException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ sshSecurityContext.setPbsCluster(pbsCluster);
+ sshSecurityContext.setUsername(sshUserName);
+ } else {
+ sshSecurityContext = new SSHSecurityContext();
+ sshSecurityContext.setUsername(sshUserName);
+ sshSecurityContext.setPrivateKeyLoc(sshPrivateKey);
+ sshSecurityContext.setKeyPass(sshPrivateKeyPass);
+ }
+ jobExecutionContext.addSecurityContext(SSHSecurityContext.SSH_SECURITY_CONTEXT, sshSecurityContext);
+ }
+
+ /**
+ * @throws WorkflowException
+ */
+ @SuppressWarnings("boxing")
+ public synchronized void waitToFinish() throws WorkflowException {
+ try {
+ while (this.result == null) {
+ // The job is not submitted yet.
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ // Wait for the job to finish.
+ Boolean success = this.result;
+ if (success == false) {
+ WSIFMessage faultMessage = this.invoker.getFault();
+ String message = "Error in a service: ";
+ // An implementation of WSIFMessage,
+ // WSIFMessageElement, implements toString(), which
+ // serialize the message XML.
+ message += faultMessage.toString();
+ throw new WorkflowException(message);
+ }
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error while waiting for a service to finish: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Error e) {
+ logger.error(e.getMessage(), e);
+ String message = "Unexpected error: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ /**
+ * @param name The name of the output parameter
+ * @return
+ * @throws WorkflowException
+ */
+ public Object getOutput(String name) throws WorkflowException {
+ try {
+ waitToFinish();
+ if (outPut instanceof XmlElement) {
+ Iterator children = ((XmlElement) outPut).children();
+ while (children.hasNext()) {
+ Object next = children.next();
+ if (((XmlElement) next).getName().equals(name)) {
+ return ((XmlElement) ((XmlElement) next).children().next()).children().next();
+ }
+ }
+ } else {
+ return outPut;
+ }
+ } catch (WorkflowException e) {
+ logger.error(e.getMessage(), e);
+ // An appropriate message has been set in the exception.
+ if (!this.failerSent) {
+ this.notifier.invocationFailed(e.getMessage(), e);
+ }
+ throw e;
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error while waiting for a output: " + name;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Error e) {
+ logger.error(e.getMessage(), e);
+ String message = "Unexpected error: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ }
+ throw new WorkflowException("Output could not be found");
+ }
+
+ /**
+ * @return
+ * @throws WorkflowException
+ */
+ public WSIFMessage getOutputs() throws WorkflowException {
+ return this.invoker.getOutputs();
+ }
+
+ public WSIFClient getClient() {
+ return null;
+ }
+
+ public WSIFMessage getInputs() throws WorkflowException {
+ return null;
+ }
+
+ public WSIFMessage getFault() throws WorkflowException {
+ return null;
+ }
+
+ private OMElement getInParameters() throws AiravataAPIInvocationException, RegistryException, XMLStreamException {
+ OMFactory omFactory = OMAbstractFactory.getOMFactory();
+ OMElement invoke_inputParams = omFactory.createOMElement(new QName("invoke_InputParams"));
+ ServiceDescription serviceDescription = airavataAPI.getApplicationManager().getServiceDescription(this.serviceName);
+ if (serviceDescription == null) {
+ throw new RegistryException(new Exception("Service Description not found in registry."));
+ }
+ ServiceDescriptionType serviceDescriptionType = serviceDescription.getType();
+ for (String inputName : this.inputNames) {
+ OMElement omElement = omFactory.createOMElement(new QName(inputName));
+ int index = this.inputNames.indexOf(inputName);
+ Object value = this.inputValues.get(index);
+ InputParameterType parameter = serviceDescriptionType.getInputParametersArray(index);
+ if (value instanceof XmlElement) {
+ omElement.setText((String) ((XmlElement) ((XmlElement) ((XmlElement) value).children().next()).children().next()).children().next());
+ XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(XMLUtil.xmlElementToString((XmlElement) value)));
+ StAXOMBuilder builder = new StAXOMBuilder(reader);
+ OMElement input = builder.getDocumentElement();
+// actualParameters.put(parameter.getParameterName(), GFacUtils.getInputActualParameter(parameter, input));
+ } else if (value instanceof String) {
+ omElement.setText((String) value);
+// actualParameters.put(parameter.getParameterName(), GFacUtils.getInputActualParameter(parameter, AXIOMUtil.stringToOM("<value>" + value + "</value>")));
+ }
+ invoke_inputParams.addChild(omElement);
+ }
+ return invoke_inputParams;
+ }
+
+ private MessageContext getOutParameters(ServiceDescription serviceDescription) {
+ MessageContext outContext = new MessageContext();
+ for (OutputParameterType parameter : serviceDescription.getType().getOutputParametersArray()) {
+ ActualParameter actualParameter = new ActualParameter();
+ if ("String".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(StringParameterType.type);
+ } else if ("Double".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(DoubleParameterType.type);
+ } else if ("Integer".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(IntegerParameterType.type);
+ } else if ("Float".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(FloatParameterType.type);
+ } else if ("Boolean".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(BooleanParameterType.type);
+ } else if ("File".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(FileParameterType.type);
+ } else if ("URI".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(URIParameterType.type);
+ } else if ("StringArray".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(StringArrayType.type);
+ } else if ("DoubleArray".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(DoubleArrayType.type);
+ } else if ("IntegerArray".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(IntegerArrayType.type);
+ } else if ("FloatArray".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(FloatArrayType.type);
+ } else if ("BooleanArray".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(BooleanArrayType.type);
+ } else if ("FileArray".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(FileArrayType.type);
+ } else if ("URIArray".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(URIArrayType.type);
+ } else if ("StdOut".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(StdOutParameterType.type);
+ } else if ("StdErr".equals(parameter.getParameterType().getName())) {
+ actualParameter.getType().changeType(StdErrParameterType.type);
+ }
+ outContext.addParameter(parameter.getParameterName(), actualParameter);
+ }
+ return outContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/GFacInvoker.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/GFacInvoker.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/GFacInvoker.java
new file mode 100644
index 0000000..10edbd9
--- /dev/null
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/GFacInvoker.java
@@ -0,0 +1,199 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.xbaya.invoker;
+
+import java.net.URI;
+import java.util.UUID;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.common.workflow.execution.context.WorkflowContextHeaderBuilder;
+import org.apache.airavata.workflow.model.exceptions.WorkflowException;
+import org.apache.airavata.xbaya.invoker.factory.InvokerFactory;
+import org.apache.airavata.xbaya.lead.NotificationHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import xsul.lead.LeadContextHeader;
+import xsul.wsdl.WsdlDefinitions;
+import xsul.wsdl.WsdlResolver;
+import xsul.wsif.WSIFMessage;
+import xsul.xhandler_soap_sticky_header.StickySoapHeaderHandler;
+import xsul.xwsif_runtime.WSIFClient;
+
+public class GFacInvoker implements Invoker {
+
+ private final static Logger logger = LoggerFactory.getLogger(GFacInvoker.class);
+
+ private String gfacURL;
+
+ private String messageBoxURL;
+
+ private QName portTypeQName;
+
+ private Invoker invoker;
+
+ private LeadContextHeader leadContext;
+
+ private WorkflowContextHeaderBuilder builder;
+
+ /**
+ * Constructs a GFacInvoker.
+ *
+ * @param portTypeQName
+ * @param gfacURL
+ * @param messageBoxURL
+ * @param context
+ */
+ public GFacInvoker(QName portTypeQName, String gfacURL, String messageBoxURL, LeadContextHeader context) {
+ this.portTypeQName = portTypeQName;
+ this.gfacURL = gfacURL;
+ this.messageBoxURL = messageBoxURL;
+ this.leadContext = context;
+ }
+
+ public GFacInvoker(QName portTypeQName, String gfacURL, String messageBoxURL, WorkflowContextHeaderBuilder context) {
+ this.portTypeQName = portTypeQName;
+ this.gfacURL = gfacURL;
+ this.messageBoxURL = messageBoxURL;
+ this.builder = context;
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#setup()
+ */
+ public void setup() throws WorkflowException {
+
+ if (this.gfacURL == null) {
+ String message = "The location of the Generic Factory is not specified.";
+ throw new WorkflowException(message);
+ }
+
+ if (this.portTypeQName == null) {
+ String message = "Error in finding the service name";
+ throw new WorkflowException(message);
+ }
+
+ try {
+
+ URI uri = new URI(this.gfacURL);
+
+ /*
+ * Substring to remove GfacService
+ */
+ String gfacPath = uri.getPath();
+ if (gfacPath != null && gfacPath.contains("/")) {
+ gfacPath = gfacPath.substring(0, gfacPath.lastIndexOf('/') + 1) + portTypeQName.getLocalPart();
+ }
+ URI getWsdlURI = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(), gfacPath
+ + "/getWSDL", uri.getQuery(), uri.getFragment());
+
+ logger.debug("getWSDL service:" + getWsdlURI.toString());
+
+ WsdlDefinitions concreteWSDL = WsdlResolver.getInstance().loadWsdl(getWsdlURI);
+
+ this.invoker = InvokerFactory.createInvoker(this.portTypeQName, concreteWSDL, null, this.messageBoxURL,
+ null, true);
+ this.invoker.setup();
+ } catch (WorkflowException xe) {
+ throw xe;
+ } catch (Exception e) {
+ throw new WorkflowException(e.getMessage(), e);
+ }
+
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#getClient()
+ */
+ public WSIFClient getClient() {
+ return this.invoker.getClient();
+ }
+
+ /**
+ * @throws WorkflowException
+ * @see org.apache.airavata.xbaya.invoker.Invoker#setOperation(java.lang.String)
+ */
+ public void setOperation(String operationName) throws WorkflowException {
+ this.invoker.setOperation(operationName);
+ }
+
+ /**
+ * @throws WorkflowException
+ * @see org.apache.airavata.xbaya.invoker.Invoker#setInput(java.lang.String, java.lang.Object)
+ */
+ public void setInput(String name, Object value) throws WorkflowException {
+ this.invoker.setInput(name, value);
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#getInputs()
+ */
+ public WSIFMessage getInputs() throws WorkflowException {
+ return this.invoker.getInputs();
+ }
+
+ /**
+ * @throws WorkflowException
+ * @see org.apache.airavata.xbaya.invoker.Invoker#invoke()
+ */
+ public boolean invoke() throws WorkflowException {
+
+ WSIFClient client = invoker.getClient();
+ // FIXME: Temporary fix
+ // if (this.leadContext == null) {
+ // LeadContextHeader lh = new LeadContextHeader(UUID.randomUUID().toString(), "XBaya-User");
+ // this.leadContext = lh;
+ // }
+ // StickySoapHeaderHandler handler = new StickySoapHeaderHandler("use-lead-header", this.leadContext);
+ // client.addHandler(handler);
+
+ // This handler has to be end to get the entire soap message.
+ NotificationHandler notificationHandler = new NotificationHandler(this.builder);
+ client.addHandler(notificationHandler);
+ return this.invoker.invoke();
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#getOutputs()
+ */
+ public WSIFMessage getOutputs() throws WorkflowException {
+ return this.invoker.getOutputs();
+ }
+
+ /**
+ * @throws WorkflowException
+ * @see org.apache.airavata.xbaya.invoker.Invoker#getOutput(java.lang.String)
+ */
+ public Object getOutput(String name) throws WorkflowException {
+ return this.invoker.getOutput(name);
+ }
+
+ /**
+ * @throws WorkflowException
+ * @see org.apache.airavata.xbaya.invoker.Invoker#getFault()
+ */
+ public WSIFMessage getFault() throws WorkflowException {
+ return this.invoker.getFault();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/GenericInvoker.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/GenericInvoker.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/GenericInvoker.java
new file mode 100644
index 0000000..e59c150
--- /dev/null
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/GenericInvoker.java
@@ -0,0 +1,529 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.xbaya.invoker;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.common.workflow.execution.context.WorkflowContextHeaderBuilder;
+import org.apache.airavata.common.utils.XMLUtil;
+import org.apache.airavata.schemas.wec.ContextHeaderDocument;
+import org.apache.airavata.workflow.model.exceptions.WorkflowException;
+import org.apache.airavata.workflow.model.exceptions.WorkflowRuntimeException;
+import org.apache.airavata.xbaya.invoker.factory.InvokerFactory;
+import org.apache.airavata.xbaya.jython.lib.ServiceNotifiable;
+import org.apache.airavata.xbaya.jython.lib.WorkflowNotifiable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmlpull.v1.builder.XmlElement;
+
+import xsul.wsdl.WsdlDefinitions;
+import xsul.wsdl.WsdlException;
+import xsul.wsdl.WsdlResolver;
+import xsul.wsif.WSIFMessage;
+import xsul.xhandler_soap_sticky_header.StickySoapHeaderHandler;
+import xsul.xwsif_runtime.WSIFClient;
+
+public class GenericInvoker implements Invoker {
+
+ private static final Logger logger = LoggerFactory.getLogger(GenericInvoker.class);
+
+ private String nodeID;
+
+ private QName portTypeQName;
+
+ private String wsdlLocation;
+
+ private String serviceInformation;
+
+ private String messageBoxURL;
+
+ private String gfacURL;
+
+ private Invoker invoker;
+
+ private Future<Boolean> result;
+
+ private ServiceNotifiable notifier;
+
+ private ContextHeaderDocument.ContextHeader contextHeader;
+
+ private String topic;
+
+ /**
+ * used for notification
+ */
+ private List<Object> inputValues = new ArrayList<Object>();
+
+ /**
+ * used for notification
+ */
+ private List<String> inputNames = new ArrayList<String>();
+
+ boolean failerSent;
+
+ private WsdlDefinitions wsdlDefinitionObject;
+
+ /**
+ * Creates an InvokerWithNotification.
+ *
+ * @param portTypeQName
+ *
+ * @param wsdlLocation
+ * The URL of WSDL of the service to invoke
+ * @param nodeID
+ * The ID of the service
+ * @param notifier
+ * The notification sender
+ */
+ public GenericInvoker(QName portTypeQName, String wsdlLocation, String nodeID, WorkflowNotifiable notifier) {
+ this(portTypeQName, wsdlLocation, nodeID, null, notifier);
+ }
+
+ /**
+ * Creates an InvokerWithNotification.
+ *
+ * @param portTypeQName
+ *
+ * @param wsdlLocation
+ * The URL of WSDL of the service to invoke
+ * @param nodeID
+ * The ID of the service
+ * @param gfacURL
+ * The URL of GFac service.
+ * @param notifier
+ * The notification sender
+ */
+ public GenericInvoker(QName portTypeQName, String wsdlLocation, String nodeID, String gfacURL,
+ WorkflowNotifiable notifier) {
+ this(portTypeQName, wsdlLocation, nodeID, null, gfacURL, notifier);
+ }
+
+ /**
+ * Creates an InvokerWithNotification.
+ *
+ * @param portTypeQName
+ *
+ * @param wsdlLocation
+ * The URL of WSDL of the service to invoke
+ * @param nodeID
+ * The ID of the service
+ * @param messageBoxURL
+ * @param gfacURL
+ * The URL of GFac service.
+ * @param notifier
+ * The notification sender
+ */
+ public GenericInvoker(QName portTypeQName, String wsdlLocation, String nodeID, String messageBoxURL,
+ String gfacURL, WorkflowNotifiable notifier) {
+ this.nodeID = nodeID;
+ this.portTypeQName = portTypeQName;
+ this.wsdlLocation = wsdlLocation;
+ this.serviceInformation = wsdlLocation;
+ this.messageBoxURL = messageBoxURL;
+ this.gfacURL = gfacURL;
+ this.notifier = notifier.createServiceNotificationSender(nodeID);
+ this.failerSent = false;
+ this.contextHeader = WorkflowContextHeaderBuilder.removeOtherSchedulingConfig(nodeID,WorkflowContextHeaderBuilder.getCurrentContextHeader());
+ this.topic = notifier.getTopic();
+ }
+
+ /**
+ *
+ * @param portTypeQName
+ * @param wsdl
+ * @param nodeID
+ * @param messageBoxURL
+ * @param gfacURL
+ * @param notifier
+ */
+ public GenericInvoker(QName portTypeQName, WsdlDefinitions wsdl, String nodeID, String messageBoxURL,
+ String gfacURL, WorkflowNotifiable notifier) {
+ final String wsdlStr = xsul.XmlConstants.BUILDER.serializeToString(wsdl);
+ this.nodeID = nodeID;
+ this.portTypeQName = portTypeQName;
+ this.wsdlDefinitionObject = wsdl;
+ this.messageBoxURL = messageBoxURL;
+ this.serviceInformation = wsdlStr;
+ this.gfacURL = gfacURL;
+ this.notifier = notifier.createServiceNotificationSender(nodeID);
+ this.failerSent = false;
+ this.contextHeader = WorkflowContextHeaderBuilder.removeOtherSchedulingConfig(nodeID,WorkflowContextHeaderBuilder.getCurrentContextHeader());
+ this.topic = notifier.getTopic();
+ }
+
+ /**
+ *
+ * @throws WorkflowException
+ */
+ public void setup() throws WorkflowException {
+ try {
+ WsdlDefinitions definitions = null;
+ if (this.wsdlLocation != null && !this.wsdlLocation.equals("")) {
+ WsdlResolver resolver = WsdlResolver.getInstance();
+ definitions = resolver.loadWsdl(new File(".").toURI(), new URI(this.wsdlLocation));
+ } else {
+ definitions = this.wsdlDefinitionObject;
+ }
+
+ setup(definitions);
+
+ } catch (WorkflowException e) {
+ logger.error(e.getMessage(), e);
+ // An appropriate message has been set in the exception.
+ this.notifier.invocationFailed(e.getMessage(), e);
+ throw e;
+ } catch (URISyntaxException e) {
+ logger.error(e.getMessage(), e);
+ String message = "The location of the WSDL has to be a valid URL or file path: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (WsdlException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error in processing the WSDL: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error in processing the WSDL: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ }catch (Error e) {
+ logger.error(e.getMessage(), e);
+ String message = "Unexpected error: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ private void setup(WsdlDefinitions definitions) throws WorkflowException {
+
+ // Set LEAD context header.
+ WorkflowContextHeaderBuilder builder;
+ if(contextHeader == null){
+ builder = new WorkflowContextHeaderBuilder(this.notifier.getEventSink()
+ .getAddress(), this.gfacURL, null, this.topic,
+ "xbaya-experiment", this.messageBoxURL);
+ }else{
+ builder = new WorkflowContextHeaderBuilder(contextHeader);
+ }
+ if(builder.getWorkflowMonitoringContext() == null){
+ builder.addWorkflowMonitoringContext(this.notifier.getEventSink().getAddress(),
+ this.topic,this.nodeID,this.messageBoxURL);
+ } else {
+ builder.getWorkflowMonitoringContext().setWorkflowInstanceId(this.notifier.getWorkflowID().toASCIIString());
+ }
+ builder.getWorkflowMonitoringContext().setWorkflowNodeId(this.nodeID);
+ builder.getWorkflowMonitoringContext().setServiceInstanceId(this.nodeID);
+ builder.getWorkflowMonitoringContext().setWorkflowTimeStep(1);
+ builder.setUserIdentifier("xbaya-user");
+ //todo write a UI component to collect this information and pass it through Header
+// builder.setGridMyProxyRepository("myproxy.nersc.gov","$user","$passwd",14000);
+ StickySoapHeaderHandler handler = new StickySoapHeaderHandler("use-workflowcontext-header", builder.getXml());
+ // Create Invoker
+ this.invoker = InvokerFactory.createInvoker(this.portTypeQName, definitions, this.gfacURL, this.messageBoxURL,
+ builder, true);
+ this.invoker.setup();
+
+ WSIFClient client = this.invoker.getClient();
+ client.addHandler(handler);
+
+ WsdlResolver resolver = WsdlResolver.getInstance();
+ // Get the concrete WSDL from invoker.setup() and set it to the
+ // notifier.
+
+ this.notifier.setServiceID(this.nodeID);
+ // if (this.wsdlLocation != null) {
+ // this.notifier.setServiceID(this.nodeID);
+ // } else {
+ // String name = this.portTypeQName.getLocalPart();
+ // this.notifier.setServiceID(name);
+ // }
+ }
+
+ /**
+ *
+ * @param operationName
+ * The name of the operation
+ * @throws WorkflowException
+ */
+ public void setOperation(String operationName) throws WorkflowException {
+ try {
+ this.invoker.setOperation(operationName);
+ } catch (WorkflowException e) {
+ logger.error(e.getMessage(), e);
+ // An appropriate message has been set in the exception.
+ this.notifier.invocationFailed(e.getMessage(), e);
+ throw e;
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "The WSDL does not conform to the invoking service: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Error e) {
+ logger.error(e.getMessage(), e);
+ String message = "Unexpected error: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ /**
+ *
+ * @param name
+ * The name of the input parameter
+ * @param value
+ * The value of the input parameter
+ * @throws WorkflowException
+ */
+ public void setInput(String name, Object value) throws WorkflowException {
+ try {
+ if (value instanceof XmlElement) {
+ logger.debug("value: " + XMLUtil.xmlElementToString((XmlElement) value));
+ }
+ this.inputNames.add(name);
+ this.inputValues.add(value);
+ this.invoker.setInput(name, value);
+ } catch (WorkflowException e) {
+ logger.error(e.getMessage(), e);
+ // An appropriate message has been set in the exception.
+ this.notifier.invocationFailed(e.getMessage(), e);
+ throw e;
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error in setting an input. name: " + name + " value: " + value;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Error e) {
+ logger.error(e.getMessage(), e);
+ String message = "Unexpected error: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ /**
+ *
+ * @return
+ * @throws WorkflowException
+ */
+ public synchronized boolean invoke() throws WorkflowException {
+ try {
+ WSIFMessage inputMessage = this.invoker.getInputs();
+ logger.debug("inputMessage: " + XMLUtil.xmlElementToString((XmlElement) inputMessage));
+ this.notifier.invokingService(inputMessage);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ this.result = executor.submit(new Callable<Boolean>() {
+ @SuppressWarnings("boxing")
+ public Boolean call() {
+ try {
+ boolean success = GenericInvoker.this.invoker.invoke();
+ if (success) {
+ // Send notification
+ WSIFMessage outputMessage = GenericInvoker.this.invoker.getOutputs();
+ // An implementation of WSIFMessage,
+ // WSIFMessageElement, implements toString(), which
+ // serialize the message XML.
+ logger.debug("outputMessage: " + outputMessage);
+ GenericInvoker.this.notifier.serviceFinished(outputMessage);
+ } else {
+ //if error occurse gfac-axis2 write the error in to output not to the fault
+ WSIFMessage faultMessage = GenericInvoker.this.invoker.getOutputs();
+ // An implementation of WSIFMessage,
+ // WSIFMessageElement, implements toString(), which
+ // serialize the message XML.
+ logger.debug("received fault: " + faultMessage);
+ GenericInvoker.this.notifier.receivedFault(faultMessage);
+ GenericInvoker.this.failerSent = true;
+ }
+ return success;
+ } catch (WorkflowException e) {
+ logger.error(e.getMessage(), e);
+ // An appropriate message has been set in the exception.
+ GenericInvoker.this.notifier.invocationFailed(e.getMessage(), e);
+ GenericInvoker.this.failerSent = true;
+ throw new WorkflowRuntimeException(e);
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error in invoking a service: " + GenericInvoker.this.serviceInformation;
+ GenericInvoker.this.notifier.invocationFailed(message, e);
+ GenericInvoker.this.failerSent = true;
+ throw e;
+ }
+ }
+ });
+
+ // Kill the thread inside of executor. This is necessary for Jython
+ // script to finish.
+ executor.shutdown();
+
+ // Let other threads know that job has been submitted.
+ notifyAll();
+
+ // Check if the invocation itself fails. This happens immediately.
+ try {
+ this.result.get(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ } catch (TimeoutException e) {
+ // The job is probably running fine.
+ // The normal case.
+ return true;
+ } catch (ExecutionException e) {
+ // The service-failed notification should have been sent
+ // already.
+ logger.error(e.getMessage(), e);
+ String message = "Error in invoking a service: " + this.serviceInformation;
+ throw new WorkflowException(message, e);
+ }
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error in invoking a service: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Error e) {
+ logger.error(e.getMessage(), e);
+ String message = "Unexpected error: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ }
+ return true;
+ }
+
+ /**
+ *
+ * @throws WorkflowException
+ */
+ @SuppressWarnings("boxing")
+ public synchronized void waitToFinish() throws WorkflowException {
+ try {
+ while (this.result == null) {
+ // The job is not submitted yet.
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ // Wait for the job to finish.
+ Boolean success = this.result.get();
+ if (success == false) {
+ WSIFMessage faultMessage = this.invoker.getFault();
+ String message = "Error in a service: ";
+ // An implementation of WSIFMessage,
+ // WSIFMessageElement, implements toString(), which
+ // serialize the message XML.
+ message += faultMessage.toString();
+ throw new WorkflowException(message);
+ }
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ } catch (ExecutionException e) {
+ // The service-failed notification should have been sent already.
+ logger.error(e.getMessage(), e);
+ String message = "Error in invoking a service: " + this.serviceInformation;
+ throw new WorkflowException(message, e);
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error while waiting for a service to finish: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Error e) {
+ logger.error(e.getMessage(), e);
+ String message = "Unexpected error: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ /**
+ *
+ * @param name
+ * The name of the output parameter
+ * @return
+ * @throws WorkflowException
+ */
+ public Object getOutput(String name) throws WorkflowException {
+ try {
+ waitToFinish();
+ Object output = this.invoker.getOutput(name);
+ if (output instanceof XmlElement) {
+ logger.debug("output: " + XMLUtil.xmlElementToString((XmlElement) output));
+ }
+ return output;
+ } catch (WorkflowException e) {
+ logger.error(e.getMessage(), e);
+ // An appropriate message has been set in the exception.
+ if (!this.failerSent) {
+ this.notifier.invocationFailed(e.getMessage(), e);
+ }
+ throw e;
+ } catch (RuntimeException e) {
+ logger.error(e.getMessage(), e);
+ String message = "Error while waiting for a output: " + name;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ } catch (Error e) {
+ logger.error(e.getMessage(), e);
+ String message = "Unexpected error: " + this.serviceInformation;
+ this.notifier.invocationFailed(message, e);
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ /**
+ *
+ * @return
+ * @throws WorkflowException
+ */
+ public WSIFMessage getOutputs() throws WorkflowException {
+ return this.invoker.getOutputs();
+ }
+
+ @Override
+ public WSIFClient getClient() {
+ return null;
+ }
+
+ @Override
+ public WSIFMessage getInputs() throws WorkflowException {
+ return null;
+ }
+
+ @Override
+ public WSIFMessage getFault() throws WorkflowException {
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/Invoker.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/Invoker.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/Invoker.java
new file mode 100644
index 0000000..4580587
--- /dev/null
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/Invoker.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.xbaya.invoker;
+
+import org.apache.airavata.workflow.model.exceptions.WorkflowException;
+
+import xsul.wsif.WSIFMessage;
+import xsul.xwsif_runtime.WSIFClient;
+
+public interface Invoker {
+
+ /**
+ * Sets up the service.
+ *
+ * @throws WorkflowException
+ */
+ public void setup() throws WorkflowException;
+
+ /**
+ * @return The WSIFClient.
+ */
+ public WSIFClient getClient();
+
+ /**
+ * Sets the operation name to invoke.
+ *
+ * @param operationName
+ * The name of the operation
+ * @throws WorkflowException
+ */
+ public void setOperation(String operationName) throws WorkflowException;
+
+ /**
+ * Sets an input parameter
+ *
+ * @param name
+ * The name of the input parameter
+ * @param value
+ * The value of the input parameter
+ * @throws WorkflowException
+ */
+ public void setInput(String name, Object value) throws WorkflowException;
+
+ /**
+ * Returns the all input parameters
+ *
+ * @return The input parameters
+ * @throws WorkflowException
+ */
+ public WSIFMessage getInputs() throws WorkflowException;
+
+ /**
+ * Invokes the service.
+ *
+ * @return true if the invocation succeeds; fase otherwise
+ * @throws WorkflowException
+ */
+ public boolean invoke() throws WorkflowException;
+
+ /**
+ * Returns the all output parameters
+ *
+ * @return The output parameters
+ * @throws WorkflowException
+ */
+ public WSIFMessage getOutputs() throws WorkflowException;
+
+ /**
+ * Returns the output of a specified name.
+ *
+ * @param name
+ * The name of the output parameter
+ * @return The value of the output
+ * @throws WorkflowException
+ */
+ public Object getOutput(String name) throws WorkflowException;
+
+ /**
+ * Returns the fault message.
+ *
+ * @return The fault message
+ * @throws WorkflowException
+ */
+ public WSIFMessage getFault() throws WorkflowException;
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/MsgBoxWsaResponsesCorrelator.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/MsgBoxWsaResponsesCorrelator.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/MsgBoxWsaResponsesCorrelator.java
new file mode 100644
index 0000000..5fee29f
--- /dev/null
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/MsgBoxWsaResponsesCorrelator.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.xbaya.invoker;
+
+import org.apache.airavata.common.utils.XMLUtil;
+import org.apache.airavata.wsmg.msgbox.client.MsgBoxClient;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.addressing.EndpointReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmlpull.v1.builder.XmlDocument;
+import org.xmlpull.v1.builder.XmlElement;
+import org.xmlpull.v1.builder.XmlInfosetBuilder;
+import xsul.MLogger;
+import xsul.XmlConstants;
+import xsul.XsulException;
+import xsul.processor.DynamicInfosetProcessorException;
+import xsul.ws_addressing.WsaEndpointReference;
+import xsul.ws_addressing.WsaMessageInformationHeaders;
+import xsul.wsif.WSIFMessage;
+import xsul.wsif.impl.WSIFMessageElement;
+import xsul.xwsif_runtime_async.WSIFAsyncResponseListener;
+import xsul.xwsif_runtime_async.WSIFAsyncResponsesCorrelator;
+import xsul.xwsif_runtime_async.WSIFAsyncWsaResponsesCorrelatorBase;
+
+import javax.xml.stream.XMLStreamException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.rmi.RemoteException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class MsgBoxWsaResponsesCorrelator extends WSIFAsyncWsaResponsesCorrelatorBase
+ implements WSIFAsyncResponsesCorrelator, Runnable
+{
+ private static final Logger logger = LoggerFactory.getLogger(MsgBoxWsaResponsesCorrelator.class);
+ private final static XmlInfosetBuilder builder = XmlConstants.BUILDER;
+
+ private String msgBoxServiceLoc;
+ private MsgBoxClient msgBoxClient;
+ EndpointReference msgBoxAddr;
+ private Thread messageBoxDonwloader;
+
+ private AsynchronousInvoker invoker;
+
+ public MsgBoxWsaResponsesCorrelator(String msgBoxServiceLoc,AsynchronousInvoker output)
+ throws DynamicInfosetProcessorException
+ {
+ this.invoker = output;
+ this.msgBoxServiceLoc = msgBoxServiceLoc;
+ msgBoxClient = new MsgBoxClient();
+ try {
+ msgBoxAddr = msgBoxClient.createMessageBox(msgBoxServiceLoc,5000L);
+ try {
+ setReplyTo(new WsaEndpointReference(new URI(msgBoxAddr.getAddress())));
+ } catch (URISyntaxException e) {
+ logger.error(e.getLocalizedMessage(),e); //To change body of catch statement use File | Settings | File Templates.
+ }
+ messageBoxDonwloader = new Thread(this, Thread.currentThread().getName()+"-async-msgbox-correlator");
+ messageBoxDonwloader.setDaemon(true);
+ messageBoxDonwloader.start();
+ } catch (RemoteException e) {
+ logger.error(e.getLocalizedMessage(),e); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+
+// public void setMsgBoxAddr(WsaEndpointReference msgBoxAddr) {
+// this.msgBoxAddr = msgBoxAddr;
+// }
+
+
+
+ public void run() {
+ while(true) {
+ try {
+ Iterator<OMElement> omElementIterator = msgBoxClient.takeMessagesFromMsgBox(msgBoxAddr, 5000L);
+ List<XmlElement> xmlArrayList = new ArrayList<XmlElement>();
+ while (omElementIterator.hasNext()){
+ OMElement next = omElementIterator.next();
+ String message = next.toStringWithConsume();
+ xmlArrayList.add(XMLUtil.stringToXmlElement3(message));
+ }
+ // now hard work: find callbacks
+ for (int i = 0; i < xmlArrayList.size(); i++) {
+ XmlElement m = xmlArrayList.get(i);
+ try {
+ logger.debug(Thread.currentThread().getName());
+ WSIFMessageElement e = new WSIFMessageElement(m);
+ this.invoker.setOutputMessage(e);
+ //ideally there are no multiple messages, so we can return from this thread at this point
+ //otherwise this thread will keep running forever for each worfklow node, so there can be large
+ // number of waiting threads in an airavata deployment
+ return;
+ } catch (Throwable e) {
+ logger.error(e.getLocalizedMessage(),e); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ try {
+ Thread.currentThread().sleep(1000L); //do not overload msg box service ...
+ } catch (InterruptedException e) {
+ break;
+ }
+ } catch (XsulException e) {
+ logger.error("could not retrieve messages");
+ break;
+ } catch (RemoteException e) {
+ logger.error("could not retrieve messages");
+ break;
+ } catch (XMLStreamException e) {
+ logger.error("could not retrieve messages");
+ break;
+ } catch (Exception e){
+ logger.error("could not retrieve messages");
+ break;
+ }
+ }
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/SimpleInvoker.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/SimpleInvoker.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/SimpleInvoker.java
new file mode 100644
index 0000000..54cb7f4
--- /dev/null
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/SimpleInvoker.java
@@ -0,0 +1,260 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.xbaya.invoker;
+
+import java.util.Iterator;
+
+import org.apache.airavata.common.utils.XMLUtil;
+import org.apache.airavata.workflow.model.exceptions.WorkflowException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xmlpull.v1.builder.XmlElement;
+
+import xsul.wsdl.WsdlDefinitions;
+import xsul.wsif.WSIFMessage;
+import xsul.wsif.WSIFOperation;
+import xsul.wsif.WSIFPort;
+import xsul.wsif.WSIFService;
+import xsul.wsif.WSIFServiceFactory;
+import xsul.wsif.spi.WSIFProviderManager;
+import xsul.xwsif_runtime.WSIFClient;
+import xsul.xwsif_runtime.WSIFRuntime;
+
+public class SimpleInvoker implements Invoker {
+
+ private static final Logger log = LoggerFactory.getLogger(GenericInvoker.class);
+
+ protected WSIFClient client;
+
+ private WsdlDefinitions definitions;
+
+ private WSIFOperation operation;
+
+ private WSIFMessage inputMessage;
+
+ private volatile WSIFMessage outputMessage;
+
+ private WSIFMessage faultMessage;
+
+ private boolean lock = false;
+
+ static {
+ WSIFProviderManager.getInstance().addProvider(new xsul.wsif_xsul_soap_http.Provider());
+ }
+
+ /**
+ * Constructs a SimpleInvoker.
+ *
+ * @param definitions
+ */
+ public SimpleInvoker(WsdlDefinitions definitions) {
+ this.definitions = definitions;
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#setup()
+ */
+ public void setup() throws WorkflowException {
+ try {
+ WSIFService service = WSIFServiceFactory.newInstance().getService(this.definitions);
+ WSIFPort port = service.getPort();
+ this.client = WSIFRuntime.getDefault().newClientFor(port);
+ this.client.setAsyncResponseTimeoutInMs(999999999);
+ } catch (RuntimeException e) {
+ String message = "The WSDL is in the wrong format";
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#getClient()
+ */
+ public WSIFClient getClient() {
+ return this.client;
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#setOperation(java.lang.String)
+ */
+ public void setOperation(String operationName) throws WorkflowException {
+ try {
+ WSIFPort port = this.client.getPort();
+ this.operation = port.createOperation(operationName);
+ this.inputMessage = this.operation.createInputMessage();
+ this.outputMessage = this.operation.createOutputMessage();
+ this.faultMessage = this.operation.createFaultMessage();
+ } catch (RuntimeException e) {
+ String message = "The WSDL does not conform to the invoking service.";
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#setInput(java.lang.String, java.lang.Object)
+ */
+ public void setInput(String name, Object value) throws WorkflowException {
+ try {
+ if (value instanceof XmlElement) {
+ // If the value is a complex type, change the name of the
+ // element to the correct one.
+ XmlElement valueElement = (XmlElement) value;
+ valueElement.setName(name);
+ } else if (value instanceof String) {
+ if(XMLUtil.isXML((String)value)){
+ XmlElement valueElement = XMLUtil.stringToXmlElement3((String) value);
+ valueElement.setName(name);
+ value = valueElement;
+ }
+ // Simple case.
+ } else {
+ // convert int, doule to string.
+ value = "" + value;
+ }
+ this.inputMessage.setObjectPart(name, value);
+ } catch (RuntimeException e) {
+ String message = "Error in setting an input. name: " + name + " value: " + value;
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#getInputs()
+ */
+ public WSIFMessage getInputs() {
+ return this.inputMessage;
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#invoke()
+ */
+ public boolean invoke() throws WorkflowException {
+ try {
+ boolean success = this.operation.executeRequestResponseOperation(this.inputMessage, this.outputMessage,
+ this.faultMessage);
+ while(this.outputMessage == null){
+
+ }
+ return success;
+ } catch (RuntimeException e) {
+ String message = "Error in invoking a service.";
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#getOutputs()
+ */
+ public WSIFMessage getOutputs() {
+ if (lock) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return this.outputMessage;
+ }
+
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#getOutput(java.lang.String)
+ */
+ public Object getOutput(String name) throws WorkflowException {
+ try {
+ // This code doesn't work when the output is a complex type.
+ // Object output = this.outputMessage.getObjectPart(name);
+ // return output;
+
+ XmlElement outputElement = (XmlElement) this.outputMessage;
+ XmlElement valueElement = outputElement.element(null, name);
+ Iterator childIt = valueElement.children();
+ int numberOfChildren = 0;
+ while (childIt.hasNext()) {
+ childIt.next();
+ numberOfChildren++;
+ }
+ if (numberOfChildren == 1) {
+ Object child = valueElement.children().next();
+ if (child instanceof String) {
+ // Value is a simple type. Return the string.
+ String value = (String) child;
+ return value;
+ }
+ if (child instanceof XmlElement) {
+ log.debug("output: " + XMLUtil.xmlElementToString((XmlElement) child));
+ Object child1 = ((XmlElement) child).children().next();
+ if (child1 instanceof String) {
+ // Value is a simple type. Return the string.
+ String value = (String) child1;
+ return value;
+ }
+ }
+ }
+ // Value is a complex type. Return the whole XmlElement so that we
+ // can set it to the next service as it is.
+ return valueElement;
+ } catch (RuntimeException e) {
+ String message = "Error in getting output. name: " + name;
+ throw new WorkflowException(message, e);
+ }
+ }
+
+ /**
+ * @see org.apache.airavata.xbaya.invoker.Invoker#getFault()
+ */
+ public WSIFMessage getFault() {
+ return this.faultMessage;
+ }
+
+ public WsdlDefinitions getDefinitions() {
+ return definitions;
+ }
+
+ public WSIFOperation getOperation() {
+ return operation;
+ }
+
+ public WSIFMessage getInputMessage() {
+ return inputMessage;
+ }
+
+ public synchronized WSIFMessage getOutputMessage() {
+ return outputMessage;
+ }
+
+ public WSIFMessage getFaultMessage() {
+ return faultMessage;
+ }
+
+ public synchronized void setOutputMessage(WSIFMessage outputMessage) {
+ log.debug("Setting output message");
+ this.outputMessage = outputMessage;
+ }
+
+ public void setLock(boolean lock) {
+ this.lock = lock;
+ }
+
+ public boolean isLock() {
+ return lock;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/WorkflowInputUtil.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/WorkflowInputUtil.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/WorkflowInputUtil.java
new file mode 100644
index 0000000..2d892f0
--- /dev/null
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/WorkflowInputUtil.java
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.xbaya.invoker;
+
+import org.apache.airavata.common.utils.StringUtil;
+import org.apache.airavata.common.utils.XMLUtil;
+import org.apache.airavata.workflow.model.component.ws.WSComponentPort;
+import org.apache.airavata.workflow.model.exceptions.WorkflowRuntimeException;
+import org.apache.airavata.xbaya.XBayaConstants;
+import org.apache.airavata.xbaya.lead.LEADTypes;
+
+import javax.xml.namespace.QName;
+
+public class WorkflowInputUtil {
+
+ public static String createInputForGFacService(WSComponentPort port,String input){
+ String paramType = port.getType().getLocalPart();
+ StringBuffer inputString = new StringBuffer("<");
+ if("StringParameterType".equals(paramType) || "URIParameterType".equals(paramType) ||
+ "DoubleParameterType".equals(paramType) || "IntegerParameterType".equals(paramType)
+ || "FloatParameterType".equals(paramType)|| "BooleanParameterType".equals(paramType)
+ || "FileParameterType".equals(paramType)){
+ inputString.append(port.getName()).append(">").
+ append(getValueElement(input)).append("</").append(port.getName()).append(">");
+ }else if(paramType.endsWith("ArrayType")){
+ inputString.append(port.getName()).append(">");
+ String[] valueList = StringUtil.getElementsFromString(input);
+ for(String inputValue:valueList){
+ inputString.append(getValueElement(inputValue));
+ }
+ inputString.append(getValueElement(port.getName()));
+ }
+ inputString.append(">");
+ return inputString.toString();
+ }
+
+ private static String getValueElement(String value){
+ return "<value>" + value + "</value>";
+ }
+ public static Object parseValue(WSComponentPort input, String valueString) {
+ String name = input.getName();
+ if (false) {
+ // Some user wants to pass empty strings, so this check is disabled.
+ if (valueString.length() == 0) {
+ throw new WorkflowRuntimeException("Input parameter, " + name + ", cannot be empty");
+ }
+ }
+ QName type = input.getType();
+ Object value;
+ if (LEADTypes.isKnownType(type)) {
+ // TODO check the type.
+ value = valueString;
+ } else {
+ try {
+ if(XBayaConstants.HTTP_SCHEMAS_AIRAVATA_APACHE_ORG_GFAC_TYPE.equals(input.getType().getNamespaceURI())){
+ value = XMLUtil.stringToXmlElement3(WorkflowInputUtil.createInputForGFacService(input, valueString));
+ }else {
+ throw new WorkflowRuntimeException("Input parameter, " + name + ", Unkown Type");
+ }
+ } catch (RuntimeException e) {
+ throw new WorkflowRuntimeException("Input parameter, " + name + ", is not valid XML", e);
+ }
+ }
+ return value;
+ }
+}