You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2013/01/09 17:49:14 UTC

svn commit: r1430956 - in /airavata/trunk/modules: gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/ gfac-core/src/main/java/org/apache/airavata/core/gfac/ xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/

Author: lahiru
Date: Wed Jan  9 16:49:13 2013
New Revision: 1430956

URL: http://svn.apache.org/viewvc?rev=1430956&view=rev
Log:
fixing EmbeddedInovoker.

Modified:
    airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/GfacAPI.java
    airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java

Modified: airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java?rev=1430956&r1=1430955&r2=1430956&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java (original)
+++ airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java Wed Jan  9 16:49:13 2013
@@ -203,7 +203,9 @@ public class GFacMessageReciever impleme
         try {
 
             gfacAPI = new GfacAPI();
-            invocationContext = gfacAPI.gridJobSubmit(jobContext, (GFacConfiguration) context.getProperty(GFacService.GFAC_CONFIGURATION));
+            String workflowNodeId = WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getWorkflowNodeId();
+            String workflowInstanceId = WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getWorkflowInstanceId();
+            invocationContext = gfacAPI.gridJobSubmit(jobContext, (GFacConfiguration) context.getProperty(GFacService.GFAC_CONFIGURATION),workflowNodeId,workflowInstanceId);
             /*
              * Add notifiable object
              */

Modified: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/GfacAPI.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/GfacAPI.java?rev=1430956&r1=1430955&r2=1430956&view=diff
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/GfacAPI.java (original)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/core/gfac/GfacAPI.java Wed Jan  9 16:49:13 2013
@@ -46,9 +46,7 @@ public class GfacAPI {
             "." + WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getExperimentId());
     public static final String REPOSITORY_PROPERTIES = "airavata-server.properties";
 
-    public DefaultInvocationContext gridJobSubmit(JobContext jobContext,GFacConfiguration gfacConfig) throws Exception {
-        String workflowNodeId = WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getWorkflowNodeId();
-        String workflowInstanceId = WorkflowContextHeaderBuilder.getCurrentContextHeader().getWorkflowMonitoringContext().getWorkflowInstanceId();
+    public DefaultInvocationContext gridJobSubmit(JobContext jobContext,GFacConfiguration gfacConfig,String workflowNodeId,String workflowInstanceId) throws Exception {
         WorkflowTrackingNotification workflowNotification = new WorkflowTrackingNotification(jobContext.getBrokerURL(),
                 jobContext.getTopic(),workflowNodeId,workflowInstanceId);
         LoggingNotification loggingNotification = new LoggingNotification();

Modified: airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java?rev=1430956&r1=1430955&r2=1430956&view=diff
==============================================================================
--- airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java (original)
+++ airavata/trunk/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/invoker/EmbeddedGFacInvoker.java Wed Jan  9 16:49:13 2013
@@ -40,6 +40,7 @@ import javax.xml.stream.XMLStreamExcepti
 import javax.xml.stream.XMLStreamReader;
 
 import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.common.workflow.execution.context.WorkflowContextHeaderBuilder;
 import org.apache.airavata.registry.api.exception.RegistryException;
 import org.apache.airavata.common.utils.XMLUtil;
 import org.apache.airavata.commons.gfac.type.ActualParameter;
@@ -63,6 +64,7 @@ import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMFactory;
 import org.apache.axiom.om.OMNamespace;
 import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xmlpull.v1.builder.XmlElement;
@@ -72,9 +74,9 @@ import xsul.wsif.WSIFMessage;
 import xsul.wsif.impl.WSIFMessageElement;
 import xsul.xwsif_runtime.WSIFClient;
 
-public class EmbeddedGFacInvoker implements Invoker{
+public class EmbeddedGFacInvoker implements Invoker {
 
-      private static final Logger logger = LoggerFactory.getLogger(EmbeddedGFacInvoker.class);
+    private static final Logger logger = LoggerFactory.getLogger(EmbeddedGFacInvoker.class);
 
     private String nodeID;
 
@@ -120,19 +122,15 @@ public class EmbeddedGFacInvoker impleme
 
     private Object outPut;
 
-    Map<Parameter,ActualParameter> actualParameters = new HashMap<Parameter,ActualParameter>();
+    Map<Parameter, ActualParameter> actualParameters = new HashMap<Parameter, ActualParameter>();
 
     /**
      * Creates an InvokerWithNotification.
      *
      * @param portTypeQName
-     *
-     * @param wsdlLocation
-     *            The URL of WSDL of the service to invoke
-     * @param nodeID
-     *            The ID of the service
-     * @param notifier
-     *            The notification sender
+     * @param wsdlLocation  The URL of WSDL of the service to invoke
+     * @param nodeID        The ID of the service
+     * @param notifier      The notification sender
      */
     public EmbeddedGFacInvoker(QName portTypeQName, String wsdlLocation, String nodeID, WorkflowNotifiable notifier) {
         this(portTypeQName, wsdlLocation, nodeID, null, notifier);
@@ -142,18 +140,13 @@ public class EmbeddedGFacInvoker impleme
      * Creates an InvokerWithNotification.
      *
      * @param portTypeQName
-     *
-     * @param wsdlLocation
-     *            The URL of WSDL of the service to invoke
-     * @param nodeID
-     *            The ID of the service
-     * @param gfacURL
-     *            The URL of GFac service.
-     * @param notifier
-     *            The notification sender
+     * @param wsdlLocation  The URL of WSDL of the service to invoke
+     * @param nodeID        The ID of the service
+     * @param gfacURL       The URL of GFac service.
+     * @param notifier      The notification sender
      */
     public EmbeddedGFacInvoker(QName portTypeQName, String wsdlLocation, String nodeID, String gfacURL,
-            WorkflowNotifiable notifier) {
+                               WorkflowNotifiable notifier) {
         this(portTypeQName, wsdlLocation, nodeID, null, gfacURL, notifier);
     }
 
@@ -161,19 +154,14 @@ public class EmbeddedGFacInvoker impleme
      * Creates an InvokerWithNotification.
      *
      * @param portTypeQName
-     *
-     * @param wsdlLocation
-     *            The URL of WSDL of the service to invoke
-     * @param nodeID
-     *            The ID of the service
+     * @param wsdlLocation  The URL of WSDL of the service to invoke
+     * @param nodeID        The ID of the service
      * @param messageBoxURL
-     * @param gfacURL
-     *            The URL of GFac service.
-     * @param notifier
-     *            The notification sender
+     * @param gfacURL       The URL of GFac service.
+     * @param notifier      The notification sender
      */
     public EmbeddedGFacInvoker(QName portTypeQName, String wsdlLocation, String nodeID, String messageBoxURL,
-            String gfacURL, WorkflowNotifiable notifier) {
+                               String gfacURL, WorkflowNotifiable notifier) {
         this.nodeID = nodeID;
         this.portTypeQName = portTypeQName;
         this.wsdlLocation = wsdlLocation;
@@ -186,7 +174,6 @@ public class EmbeddedGFacInvoker impleme
     }
 
     /**
-     *
      * @param portTypeQName
      * @param wsdl
      * @param nodeID
@@ -220,7 +207,6 @@ public class EmbeddedGFacInvoker impleme
     }
 
     /**
-     *
      * @throws WorkflowException
      */
     public void setup() throws WorkflowException {
@@ -231,20 +217,15 @@ public class EmbeddedGFacInvoker impleme
     }
 
     /**
-     *
-     * @param operationName
-     *            The name of the operation
+     * @param operationName The name of the operation
      * @throws WorkflowException
      */
     public void setOperation(String operationName) throws WorkflowException {
     }
 
     /**
-     *
-     * @param name
-     *            The name of the input parameter
-     * @param value
-     *            The value of the input parameter
+     * @param name  The name of the input parameter
+     * @param value The value of the input parameter
      * @throws WorkflowException
      */
     public void setInput(String name, Object value) throws WorkflowException {
@@ -255,16 +236,20 @@ public class EmbeddedGFacInvoker impleme
             this.inputNames.add(name);
             this.inputValues.add(value);
             ServiceDescription serviceDescription = airavataAPI.getApplicationManager().getServiceDescription(this.serviceName);
-            if(serviceDescription==null){
-            	throw new RegistryException(new Exception("Service Description not found in registry."));
+            if (serviceDescription == null) {
+                throw new RegistryException(new Exception("Service Description not found in registry."));
             }
             ServiceDescriptionType serviceDescriptionType = serviceDescription.getType();
             for (Parameter parameter : serviceDescriptionType.getInputParametersArray()) {
                 //todo this implementation doesn't work when there are n number of nodes connecting .. need to fix
-                XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(XMLUtil.xmlElementToString((XmlElement) value)));
-                StAXOMBuilder builder = new StAXOMBuilder(reader);
-                OMElement input = builder.getDocumentElement();
-                actualParameters.put(parameter, GfacUtils.getInputActualParameter(parameter, input));
+                if (value instanceof XmlElement) {
+                    XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(XMLUtil.xmlElementToString((XmlElement) value)));
+                    StAXOMBuilder builder = new StAXOMBuilder(reader);
+                    OMElement input = builder.getDocumentElement();
+                    actualParameters.put(parameter, GfacUtils.getInputActualParameter(parameter, input));
+                } else if (value instanceof String) {
+                    actualParameters.put(parameter, GfacUtils.getInputActualParameter(parameter, AXIOMUtil.stringToOM("<value>" + value + "</value>")));
+                }
             }
         } catch (RuntimeException e) {
             logger.error(e.getMessage(), e);
@@ -284,98 +269,67 @@ public class EmbeddedGFacInvoker impleme
     }
 
     /**
-     *
      * @return
      * @throws WorkflowException
      */
     public synchronized boolean invoke() throws WorkflowException {
         try {
-            ExecutorService executor = Executors.newSingleThreadExecutor();
-            this.result = executor.submit(new Callable<Boolean>() {
-                @SuppressWarnings("boxing")
-                public Boolean call() {
-                    try {
-                        JobContext jobContext = new JobContext(actualParameters,EmbeddedGFacInvoker.this.topic,
-                                EmbeddedGFacInvoker.this.serviceName,EmbeddedGFacInvoker.this.gfacURL);
-                        GFacConfiguration gFacConfiguration = new GFacConfiguration(EmbeddedGFacInvoker.this.configuration.getMyProxyServer(),
-                                EmbeddedGFacInvoker.this.configuration.getMyProxyUsername(),
-                            EmbeddedGFacInvoker.this.configuration.getMyProxyPassphrase(),EmbeddedGFacInvoker.this.configuration.getMyProxyLifetime(),
-                                EmbeddedGFacInvoker.this.airavataAPI, EmbeddedGFacInvoker.this.configuration.getTrustedCertLocation());
-
-                        GfacAPI gfacAPI1 = new GfacAPI();
-                        InvocationContext defaultInvocationContext = gfacAPI1.gridJobSubmit(jobContext, gFacConfiguration);
-                        ParameterContextImpl outputParamContext = (ParameterContextImpl) defaultInvocationContext
-                                .<ActualParameter>getMessageContext("output");
-                        if (outputParamContext.getNames().hasNext()) {
-                            /*
-                            * Process Output
-                            */
-                            OMFactory fac = OMAbstractFactory.getOMFactory();
-                            OMNamespace omNs = fac.createOMNamespace("http://ws.apache.org/axis2/xsd", "ns1");
-                            OMElement outputElement = fac.createOMElement("invokeResponse", omNs);
-
-                            for (Iterator<String> iterator = outputParamContext.getNames(); iterator.hasNext(); ) {
-                                String name = iterator.next();
-                                String outputString = outputParamContext.getValue(name).toXML().replaceAll("GFacParameter", name);
-                                XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(outputString));
-                                StAXOMBuilder builder = new StAXOMBuilder(reader);
-                                outputElement.addChild(builder.getDocumentElement());
-                            }
-                            // Send notification
-                            logger.info("outputMessage: " + outputElement.toString());
-                            outPut = new WSIFMessageElement(XMLUtil.stringToXmlElement3(outputElement.toStringWithConsume()));
-                            EmbeddedGFacInvoker.this.notifier.serviceFinished(new WSIFMessageElement((XmlElement)outPut));
-                        } else {
-                            // An implementation of WSIFMessage,
-                            // WSIFMessageElement, implements toString(), which
-                            // serialize the message XML.
-                            EmbeddedGFacInvoker.this.notifier.receivedFault(new WSIFMessageElement(XMLUtil.stringToXmlElement3("<Message>Invocation Failed</Message>")));
-                            EmbeddedGFacInvoker.this.failerSent = true;
-                        }
-                        return true;
-                    } catch (WorkflowException e) {
-                        logger.error(e.getMessage(), e);
-                        // An appropriate message has been set in the exception.
-                        EmbeddedGFacInvoker.this.notifier.invocationFailed(e.getMessage(), e);
-                        EmbeddedGFacInvoker.this.failerSent = true;
-                        throw new WorkflowRuntimeException(e);
-                    } catch (RuntimeException e) {
-                        logger.error(e.getMessage(), e);
-                        String message = "Error in invoking a service: " + EmbeddedGFacInvoker.this.serviceInformation;
-                        EmbeddedGFacInvoker.this.notifier.invocationFailed(message, e);
-                        EmbeddedGFacInvoker.this.failerSent = true;
-                        throw e;
-                    } catch (XMLStreamException e) {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    } catch (Exception e) {
-                        e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-                    }
-                    return false;
+            JobContext jobContext = new JobContext(actualParameters, EmbeddedGFacInvoker.this.topic,
+                    EmbeddedGFacInvoker.this.serviceName, EmbeddedGFacInvoker.this.gfacURL);
+            GFacConfiguration gFacConfiguration = new GFacConfiguration(EmbeddedGFacInvoker.this.configuration.getMyProxyServer(),
+                    EmbeddedGFacInvoker.this.configuration.getMyProxyUsername(),
+                    EmbeddedGFacInvoker.this.configuration.getMyProxyPassphrase(), EmbeddedGFacInvoker.this.configuration.getMyProxyLifetime(),
+                    EmbeddedGFacInvoker.this.airavataAPI, EmbeddedGFacInvoker.this.configuration.getTrustedCertLocation());
+
+            GfacAPI gfacAPI1 = new GfacAPI();
+            InvocationContext defaultInvocationContext = gfacAPI1.gridJobSubmit(jobContext,
+                    gFacConfiguration,this.nodeID,this.notifier.getWorkflowID().toASCIIString());
+            ParameterContextImpl outputParamContext = (ParameterContextImpl) defaultInvocationContext
+                    .<ActualParameter>getMessageContext("output");
+            if (outputParamContext.getNames().hasNext()) {
+                /*
+                * Process Output
+                */
+                OMFactory fac = OMAbstractFactory.getOMFactory();
+                OMNamespace omNs = fac.createOMNamespace("http://ws.apache.org/axis2/xsd", "ns1");
+                OMElement outputElement = fac.createOMElement("invokeResponse", omNs);
+
+                for (Iterator<String> iterator = outputParamContext.getNames(); iterator.hasNext(); ) {
+                    String name = iterator.next();
+                    String outputString = outputParamContext.getValue(name).toXML().replaceAll("GFacParameter", name);
+                    XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(outputString));
+                    StAXOMBuilder builder = new StAXOMBuilder(reader);
+                    outputElement.addChild(builder.getDocumentElement());
                 }
-            });
-            // Kill the thread inside of executor. This is necessary for Jython
-            // script to finish.
-//            executor.shutdown();
+                // Send notification
+                logger.info("outputMessage: " + outputElement.toString());
+                outPut = new WSIFMessageElement(XMLUtil.stringToXmlElement3(outputElement.toStringWithConsume()));
+                EmbeddedGFacInvoker.this.notifier.serviceFinished(new WSIFMessageElement((XmlElement) outPut));
+            } else {
+                // An implementation of WSIFMessage,
+                // WSIFMessageElement, implements toString(), which
+                // serialize the message XML.
+                EmbeddedGFacInvoker.this.notifier.receivedFault(new WSIFMessageElement(XMLUtil.stringToXmlElement3("<Message>Invocation Failed</Message>")));
+                EmbeddedGFacInvoker.this.failerSent = true;
+            }
 
-            // Let other threads know that job has been submitted.
-            notifyAll();
 
             // Check if the invocation itself fails. This happens immediately.
-            try {
-                this.result.get(100, TimeUnit.MILLISECONDS);
-            } catch (InterruptedException e) {
-                logger.error(e.getMessage(), e);
-            } catch (TimeoutException e) {
-                // The job is probably running fine.
-                // The normal case.
-                return true;
-            } catch (ExecutionException e) {
-                // The service-failed notification should have been sent
-                // already.
-                logger.error(e.getMessage(), e);
-                String message = "Error in invoking a service: " + this.serviceInformation;
-                throw new WorkflowException(message, e);
-            }
+//            try {
+//                this.result.get(100, TimeUnit.MILLISECONDS);
+//            } catch (InterruptedException e) {
+//                logger.error(e.getMessage(), e);
+//            } catch (TimeoutException e) {
+//                // The job is probably running fine.
+//                // The normal case.
+//                return true;
+//            } catch (ExecutionException e) {
+//                // The service-failed notification should have been sent
+//                // already.
+//                logger.error(e.getMessage(), e);
+//                String message = "Error in invoking a service: " + this.serviceInformation;
+//                throw new WorkflowException(message, e);
+//            }
         } catch (RuntimeException e) {
             logger.error(e.getMessage(), e);
             String message = "Error in invoking a service: " + this.serviceInformation;
@@ -393,7 +347,6 @@ public class EmbeddedGFacInvoker impleme
     }
 
     /**
-     *
      * @throws WorkflowException
      */
     @SuppressWarnings("boxing")
@@ -439,16 +392,14 @@ public class EmbeddedGFacInvoker impleme
     }
 
     /**
-     *
-     * @param name
-     *            The name of the output parameter
+     * @param name The name of the output parameter
      * @return
      * @throws WorkflowException
      */
     public Object getOutput(String name) throws WorkflowException {
         try {
             waitToFinish();
-            return  outPut;
+            return outPut;
         } catch (WorkflowException e) {
             logger.error(e.getMessage(), e);
             // An appropriate message has been set in the exception.
@@ -470,7 +421,6 @@ public class EmbeddedGFacInvoker impleme
     }
 
     /**
-     *
      * @return
      * @throws WorkflowException
      */