You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2011/09/14 19:58:04 UTC

svn commit: r1170747 - /incubator/airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java

Author: lahiru
Date: Wed Sep 14 17:58:04 2011
New Revision: 1170747

URL: http://svn.apache.org/viewvc?rev=1170747&view=rev
Log:
adding notification funcationality to gfac-axis2.

Modified:
    incubator/airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java

Modified: incubator/airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java?rev=1170747&r1=1170746&r2=1170747&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java (original)
+++ incubator/airavata/trunk/modules/gfac-axis2/src/main/java/org/apache/airavata/services/gfac/axis2/reciever/GFacMessageReciever.java Wed Sep 14 17:58:04 2011
@@ -32,6 +32,7 @@ import javax.xml.stream.XMLInputFactory;
 import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamReader;
 
+import org.apache.airavata.core.gfac.notification.impl.WorkflowTrackingNotification;
 import org.apache.airavata.registry.api.Axis2Registry;
 import org.apache.airavata.commons.gfac.type.Parameter;
 import org.apache.airavata.commons.gfac.type.ServiceDescription;
@@ -53,6 +54,7 @@ import org.apache.axiom.om.OMNamespace;
 import org.apache.axiom.om.impl.builder.StAXOMBuilder;
 import org.apache.axiom.soap.SOAPEnvelope;
 import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeader;
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.ConfigurationContext;
@@ -63,6 +65,7 @@ import org.apache.axis2.util.MessageCont
 import org.apache.axis2.util.Utils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.jackrabbit.core.data.AbstractDataRecord;
 import org.xmlpull.v1.builder.XmlDocument;
 import org.xmlpull.v1.builder.XmlInfosetBuilder;
 
@@ -78,34 +81,34 @@ public class GFacMessageReciever impleme
         GFacServiceOperations operation = GFacServiceOperations.valueFrom(axisRequestMsgCtx.getOperationContext()
                 .getOperationName());
         switch (operation) {
-        case GETABSTRACTWSDL:
-            try {
-                log.debug("invoking getAbstractWSDL operation");
-                processgetAbstractWSDLOperation(axisRequestMsgCtx);
-                log.debug("getAbstractWSDL operation invoked");
-            } catch (Exception e) {
-                throw new AxisFault("Error retrieving the WSDL", e);
-            }
+            case GETABSTRACTWSDL:
+                try {
+                    log.debug("invoking getAbstractWSDL operation");
+                    processgetAbstractWSDLOperation(axisRequestMsgCtx);
+                    log.debug("getAbstractWSDL operation invoked");
+                } catch (Exception e) {
+                    throw new AxisFault("Error retrieving the WSDL", e);
+                }
 
-            break;
-        case INVOKE:
-            try {
-                log.debug("invoking Invoke operation");
-                processInvokeOperation(axisRequestMsgCtx);
-                log.info("Invoke operation invoked !!");
-            } catch (Exception e) {
-                throw new AxisFault("Error Invoking the service", e);
-            }
-            break;
-        case GETWSDL:
-            try {
-                log.debug("invoking getAbstractWSDL operation");
-                processgetWSDLOperation(axisRequestMsgCtx);
-                log.info("getWSDL operation invoked !!");
-            } catch (Exception e) {
-                throw new AxisFault("Error retrieving the WSDL", e);
-            }
-            break;
+                break;
+            case INVOKE:
+                try {
+                    log.debug("invoking Invoke operation");
+                    processInvokeOperation(axisRequestMsgCtx);
+                    log.info("Invoke operation invoked !!");
+                } catch (Exception e) {
+                    throw new AxisFault("Error Invoking the service", e);
+                }
+                break;
+            case GETWSDL:
+                try {
+                    log.debug("invoking getAbstractWSDL operation");
+                    processgetWSDLOperation(axisRequestMsgCtx);
+                    log.info("getWSDL operation invoked !!");
+                } catch (Exception e) {
+                    throw new AxisFault("Error retrieving the WSDL", e);
+                }
+                break;
         }
     }
 
@@ -113,8 +116,6 @@ public class GFacMessageReciever impleme
         MessageContext response = null;
         String serviceName = getOriginalServiceName(messageContext);
         try {
-            ConfigurationContext context = messageContext.getConfigurationContext();
-
             /*
              * We assume that input likes <invoke>
              * <input_param_name1>value</input_param_name1>
@@ -129,7 +130,7 @@ public class GFacMessageReciever impleme
              * <output_param_name2>value</output_param_name2>
              * <output_param_name3>value</output_param_name3> </invokeResponse>
              */
-            OMElement output = invokeApplication(serviceName, invoke, context);
+            OMElement output = invokeApplication(serviceName, invoke, messageContext);
 
             SOAPFactory sf = OMAbstractFactory.getSOAP11Factory();
             SOAPEnvelope responseEnv = sf.createSOAPEnvelope();
@@ -144,12 +145,15 @@ public class GFacMessageReciever impleme
         }
     }
 
-    private OMElement invokeApplication(String serviceName, OMElement input, ConfigurationContext context)
+    private OMElement invokeApplication(String serviceName, OMElement input, MessageContext messageContext)
             throws Exception {
+        ConfigurationContext context = messageContext.getConfigurationContext();
+        String brokerURL = getEventBrokerURL(messageContext);
+        String topic = getTopic(messageContext);
         OMElement outputElement = null;
         try {
-            LoggingNotification notification = new LoggingNotification();
-
+            WorkflowTrackingNotification notification = new WorkflowTrackingNotification(brokerURL,topic);
+//            LoggingNotification notification = new LoggingNotification();
             DefaultInvocationContext ct = new DefaultInvocationContext();
             ct.setExecutionContext(new DefaultExecutionContext());
             ct.setServiceName(serviceName);
@@ -204,8 +208,8 @@ public class GFacMessageReciever impleme
             outputElement = fac.createOMElement("invokeResponse", omNs);
 
             ParameterContextImpl paramContext = (ParameterContextImpl) ct
-                    .<AbstractParameter> getMessageContext("output");
-            for (Iterator<String> iterator = paramContext.getNames(); iterator.hasNext();) {
+                    .<AbstractParameter>getMessageContext("output");
+            for (Iterator<String> iterator = paramContext.getNames(); iterator.hasNext(); ) {
                 String name = iterator.next();
                 OMElement ele = fac.createOMElement(name, omNs);
                 ele.addAttribute("type", paramContext.getValue(name).getType().toString(), omNs);
@@ -380,23 +384,23 @@ public class GFacMessageReciever impleme
 
     /**
      * Get Abstract WSDL and build it as OMElement
-     * 
+     *
      * @param context
      * @param serviceName
      * @return
      * @throws XMLStreamException
      */
-    private OMElement getWSDL(ConfigurationContext context, String serviceName) throws XMLStreamException{
+    private OMElement getWSDL(ConfigurationContext context, String serviceName) throws XMLStreamException {
         String WSDL = getRegistry(context).getWSDL(serviceName);
         XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new StringReader(WSDL));
         StAXOMBuilder builder = new StAXOMBuilder(reader);
         OMElement wsdlElement = builder.getDocumentElement();
         return wsdlElement;
     }
-    
+
     /**
      * Get Registry Object in the configuration context
-     * 
+     *
      * @param context
      * @return
      */
@@ -414,4 +418,21 @@ public class GFacMessageReciever impleme
         return values[0];
     }
 
+    private String getEventBrokerURL(MessageContext context) {
+        SOAPHeader header = context.getEnvelope().getHeader();
+        OMElement contextHeader = header.getFirstChildWithName(new QName("http://lead.extreme.indiana.edu/namespaces/2005/10/lead-context-header", "context"));
+        OMElement eventSink = contextHeader.getFirstChildWithName(new QName("http://lead.extreme.indiana.edu/namespaces/2005/10/lead-context-header", "event-sink-epr"));
+        String address = eventSink.getFirstChildWithName(new QName("http://www.w3.org/2005/08/addressing","Address")).getText();
+        return address;
+    }
+
+
+    private String getTopic(MessageContext context) {
+        SOAPHeader header = context.getEnvelope().getHeader();
+        OMElement contextHeader = header.getFirstChildWithName(new QName("http://lead.extreme.indiana.edu/namespaces/2005/10/lead-context-header", "context"));
+        OMElement workflowId = contextHeader.getFirstChildWithName(new QName("http://lead.extreme.indiana.edu/namespaces/2005/10/lead-context-header", "workflow-instance-id"));
+        String topic = workflowId.getText().substring(1);
+        return topic;
+    }
+
 }