You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:31:27 UTC

[85/90] [abbrv] AIRAVATA-1124

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java
new file mode 100644
index 0000000..9ddadb3
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java
@@ -0,0 +1,301 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.subscription;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.util.OMElementComparator;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.addressing.EndpointReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SubscriptionState {
+
+    private final Logger logger = LoggerFactory.getLogger(SubscriptionState.class);
+
+    private long creationTime = 0;
+    private long lastAvailableTime = 0;
+
+    private int unAvailableCounter = 0;
+    private boolean isNeverExpire = false;
+    private boolean isWsrmPolicy;
+
+    public String subId;
+    public String curNotif;
+    private String localTopicString;
+    private String xpathString;
+    private String subscribeXml;
+
+    ConsumerInfo consumerInfo = null;
+
+    EndpointReference consumerReference;
+
+    URI consumerURI = null;
+
+    private OutGoingQueue outGoingQueue;
+
+    /**
+     * @return Returns the creationTime.
+     */
+    public long getCreationTime() {
+        return creationTime;
+    }
+
+    /**
+     * @param creationTime
+     *            The creationTime to set.
+     */
+    public void setCreationTime(long creationTime) {
+        this.creationTime = creationTime;
+    }
+
+    public void setId(String id) {
+        subId = id;
+    }
+
+    public String getId() {
+        return subId;
+    }
+
+    public boolean isWsrmPolicy() {
+        return isWsrmPolicy;
+    }
+
+    public void setWsrmPolicy(boolean wsrmPolicy) {
+        this.isWsrmPolicy = wsrmPolicy;
+    }
+
+    // TODO: outGoingQueue is not belong to this class. Move it to elsewhere
+    // related to notification handler in wsntAdapter.
+    public SubscriptionState(EndpointReference consumerRef, boolean useNotify, boolean wsrmEnabled, String topic,
+            String xpath, String type, OutGoingQueue outGoingQueue) {
+        this.consumerReference = consumerRef;
+        try {
+            this.consumerURI = new URI(consumerRef.getAddress());
+        } catch (URISyntaxException e) {
+            // this should not happen
+            logger.error("invalid consumer URI returned by axis om", e);
+
+        }
+        this.outGoingQueue = outGoingQueue;
+        // if (topic == null) {
+        // throw new IllegalArgumentException();
+        // }
+        this.localTopicString = topic;
+        this.xpathString = xpath;
+        this.isWsrmPolicy = wsrmEnabled;
+        consumerInfo = new ConsumerInfo(consumerRef.getAddress(), type, useNotify, false);
+
+    }
+
+    public void resume() {
+        consumerInfo.setPaused(false);
+    }
+
+    public void pause() {
+        consumerInfo.setPaused(true);
+    }
+
+    public String getConsumerIPAddressStr() {
+        return consumerURI.toString();
+    }
+
+    public URI getConsumerAddressURI() {
+        return consumerURI;
+    }
+
+    public String getLocalTopic() {
+        // QName topicExpressionQName =
+        // xsul.util.XsulUtil.toQName(localTopicString, localTopicString
+        // .requiredTextContent());
+        // String topicLocalString = topicExpressionQName.getLocalPart();
+        return localTopicString;
+    }
+
+    /**
+     * @return Returns the consumeReference.
+     */
+    public EndpointReference getConsumerReference() {
+        return consumerReference;
+    }
+
+    /**
+     * @return Returns the curNotif.
+     */
+    public String getCurNotif() {
+        return curNotif;
+    }
+
+    /**
+     * @param curNotif
+     *            The curNotif to set.
+     */
+    public void setCurNotif(String curNotif) {
+        this.curNotif = curNotif;
+    }
+
+    /**
+     * @return Returns the outGoingQueue.
+     */
+    public OutGoingQueue getOutGoingQueue() {
+        return outGoingQueue;
+    }
+
+    /**
+     * @param outGoingQueue
+     *            The outGoingQueue to set.
+     */
+    public void setOutGoingQueue(OutGoingQueue outGoingQueue) {
+        this.outGoingQueue = outGoingQueue;
+    }
+
+    /**
+     * @return Returns the consumerInfo.
+     */
+    public ConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
+
+    public void resetUnAvailableCounter() {
+        unAvailableCounter = 0;
+    }
+
+    public int addUnAvailableCounter() {
+        unAvailableCounter++;
+        return unAvailableCounter;
+    }
+
+    /**
+     * @return Returns the unAvailableCounter.
+     */
+    public int getUnAvailableCounter() {
+        return unAvailableCounter;
+    }
+
+    public String getXpathString() {
+        return xpathString;
+    }
+
+    public void setXpathString(String xpathString) {
+        this.xpathString = xpathString;
+    }
+
+    /**
+     * @return Returns the isNeverExpire.
+     */
+    public boolean isNeverExpire() {
+        return isNeverExpire;
+    }
+
+    /**
+     * @param isNeverExpire
+     *            The isNeverExpire to set.
+     */
+    public void setNeverExpire(boolean neverExpire) {
+        this.isNeverExpire = neverExpire;
+    }
+
+    /**
+     * @return Returns the lastAvailableTime.
+     */
+    public long getLastAvailableTime() {
+        return lastAvailableTime;
+    }
+
+    /**
+     * @param lastAvailableTime
+     *            The lastAvailableTime to set.
+     */
+    public void setLastAvailableTime(long lastAvailableTime) {
+        this.lastAvailableTime = lastAvailableTime;
+
+    }
+
+    public void setSubscribeXml(String xml) {
+        subscribeXml = xml;
+    }
+
+    public String getSubscribeXml() {
+        return subscribeXml;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof SubscriptionState) {
+            SubscriptionState subscription = (SubscriptionState) o;
+            return BrokerUtil.sameStringValue(subscription.getLocalTopic(), this.getLocalTopic())
+                    && BrokerUtil.sameStringValue(subscription.getXpathString(), this.getXpathString())
+                    && BrokerUtil.sameStringValue(subscription.getConsumerIPAddressStr(),
+                            this.getConsumerIPAddressStr()) && equalReferenceParameters(subscription);
+        }
+        return false;
+    }
+
+    private boolean equalReferenceParameters(SubscriptionState anotherSubscription) {
+
+        Map<QName, OMElement> otherRefProperties = anotherSubscription.getConsumerReference()
+                .getAllReferenceParameters();
+        Map<QName, OMElement> myRefProperties = getConsumerReference().getAllReferenceParameters();
+
+        /*
+         * Basic comparison
+         */
+        if (otherRefProperties == null && myRefProperties == null) {
+            return true;
+        } else if (otherRefProperties == null || myRefProperties == null) {
+            return false;
+        } else if (otherRefProperties.size() != myRefProperties.size()) {
+            return false;
+        }
+
+        /*
+         * This OMElementComparator supports ignore list, but we don't use it here.
+         */
+        Iterator<Entry<QName, OMElement>> iterator = otherRefProperties.entrySet().iterator();
+        while (iterator.hasNext()) {
+
+            Entry<QName, OMElement> entry = iterator.next();
+            if (!myRefProperties.containsKey(entry.getKey())) {
+                return false;
+            }
+
+            OMElement myElement = myRefProperties.get(entry.getKey());
+            OMElement otherElement = entry.getValue();
+
+            if (!OMElementComparator.compare(myElement, otherElement)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java
new file mode 100644
index 0000000..fe276bf
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wseventing;
+
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.context.ContextParameters;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.broker.context.ProcessingContextBuilder;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.airavata.wsmg.util.WsEventingOperations;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPBody;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.apache.axis2.context.MessageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WSEProcessingContextBuilder extends ProcessingContextBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(WSEProcessingContextBuilder.class);
+
+    public ProcessingContext build(OMElement elem) {
+
+        ProcessingContext processingContext = new ProcessingContext();
+
+        if (elem != null && elem.getLocalName().equals("Subscribe")) {
+            logger.debug("found subscribe element");
+            onSubscription(processingContext, elem);
+
+        }
+
+        return processingContext;
+    }
+
+    public ProcessingContext build(SOAPEnvelope elem) {
+
+        ProcessingContext context = null;
+
+        SOAPBody soapBody = elem.getBody();
+        if (soapBody != null) {
+
+            context = build(soapBody.getFirstElement());
+
+        } else {
+            context = build((OMElement) null);
+        }
+
+        context.setEnvelope(elem);
+        extractInfoFromHeader(context, elem.getHeader());
+
+        return context;
+    }
+
+    public ProcessingContext build(MessageContext msgContext, WsEventingOperations operation) {
+
+        ProcessingContext processingContext = new ProcessingContext();
+
+        switch (operation) {
+        case SUBSCRIBE: {
+
+            Iterator<OMElement> iterator = msgContext.getEnvelope().getBody()
+                    .getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(), "Subscribe"));
+
+            if (!iterator.hasNext()) {
+                throw new RuntimeException("invalid subscription message - no subscribe element");
+            }
+
+            onSubscription(processingContext, iterator.next());
+        }
+            break;
+
+        }
+
+        processingContext.setMessageConext(msgContext);
+        processingContext.setEnvelope(msgContext.getEnvelope());
+        extractInfoFromHeader(processingContext, msgContext.getEnvelope().getHeader());
+        String topicFromUrl = BrokerUtil.getTopicFromRequestPath(msgContext.getTo().getAddress());
+
+        processingContext.setContextParameter(ContextParameters.TOPIC_FROM_URL, topicFromUrl);
+
+        return processingContext;
+    }
+
+    /**
+     * @param processingContext
+     * @param subscribeElement
+     */
+    private void onSubscription(ProcessingContext processingContext, OMElement subscribeElement) {
+
+        processingContext.setContextParameter(ContextParameters.SUBSCRIBE_ELEMENT, subscribeElement);
+
+        // -- check optional element - expires
+        Iterator iterator = subscribeElement.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(),
+                "Expires"));
+
+        if (iterator.hasNext()) {
+
+            processingContext.setContextParameter(ContextParameters.SUBSCRIBER_EXPIRES,
+                    ((OMElement) iterator.next()).getText());
+
+        }
+
+        iterator = subscribeElement
+                .getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(), "Filter"));
+
+        if (!iterator.hasNext()) {
+
+            throw new RuntimeException("invalid subscription - unable to find filter dialet");
+
+        }
+
+        processingContext.setContextParameter(ContextParameters.FILTER_ELEMENT, iterator.next());
+
+        iterator = subscribeElement.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(),
+                "Delivery"));
+
+        if (!iterator.hasNext()) {
+            throw new RuntimeException("invalid subscription - unable to find delivery tag");
+        }
+
+        OMElement delivery = (OMElement) iterator.next();
+
+        iterator = delivery.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(), "NotifyTo"));
+
+        if (!iterator.hasNext()) {
+            throw new RuntimeException("invalid subscription - unable to find NotifyTo tag");
+        }
+
+        OMElement notifyToElement = (OMElement) iterator.next();
+
+        processingContext.setContextParameter(ContextParameters.NOTIFY_TO_ELEMENT, notifyToElement);
+
+        try {
+
+            processingContext.setContextParameter(ContextParameters.NOTIFY_TO_EPR,
+                    EndpointReferenceHelper.fromOM(notifyToElement));
+
+        } catch (AxisFault e) {
+            throw new RuntimeException("invalid subscription - unable to parse notify to end point reference", e);
+        }
+
+    }
+
+    private void extractInfoFromHeader(ProcessingContext context, SOAPHeader header) {
+
+        Iterator ite = header.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(),
+                WsmgCommonConstants.SUBSCRIPTION_ID));
+
+        if (ite.hasNext()) {
+            OMElement identifier = (OMElement) ite.next();
+            logger.debug("extracted identifier " + identifier.getText());
+
+            context.setContextParameter(ContextParameters.SUB_ID, identifier.getText());
+
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java
new file mode 100644
index 0000000..f17a980
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wseventing;
+
+import java.util.Calendar;
+import java.util.Date;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.context.ContextParameters;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WSEProtocolSupport {
+
+    private static final Logger log = LoggerFactory.getLogger(WSEProtocolSupport.class);
+
+    public SubscriptionState createSubscriptionState(ProcessingContext ctx, OutGoingQueue outgoingQueue)
+            throws AxisFault {
+
+        boolean neverExpire = false; // is true if expiration time is less than
+        String topicLocalString = "";
+        String xpathString = "";
+        EndpointReference consumerReference = ctx.getContextParameter(ContextParameters.NOTIFY_TO_EPR);
+
+        if (consumerReference == null) {
+            throw new AxisFault("Only Push delivery Mode (NotifyTo) is supported in WSE");
+        }
+
+        String expireTimeString = ctx.getContextParameter(ContextParameters.SUBSCRIBER_EXPIRES);
+
+        if (expireTimeString == null) {
+            neverExpire = true;
+
+        } else {
+
+            long expireTime = Long.valueOf(expireTimeString);
+            if (expireTime < 0) {
+                neverExpire = true;
+            }
+        }
+
+        OMElement filterEl = ctx.getContextParameter(ContextParameters.FILTER_ELEMENT);
+
+        if (filterEl == null) {
+
+            topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);
+
+            if (topicLocalString == null) {
+                topicLocalString = WsmgCommonConstants.WILDCARD_TOPIC;
+            }
+
+            log.debug("got topicLocalString=" + topicLocalString);
+            // topicLocalString = "wseTopic";
+            // // a special topic, used in WSNT and JMS. Do not use a
+            // wildcard topic here since wildcard string varies by system
+        } else {
+
+            String filterDialectAttrib = filterEl.getAttributeValue(new QName(null, "Dialect"));
+
+            if (filterDialectAttrib.compareTo(WsmgCommonConstants.TOPIC_EXPRESSION_SIMPLE_DIALECT) == 0) {
+                topicLocalString = BrokerUtil.getTopicLocalString(filterEl.getText()); // get what ever inside this
+                                                                                       // element
+
+                if (topicLocalString == null) {
+                    throw new AxisFault("topic is not given in the subscription");
+                }
+
+            } else if (filterDialectAttrib.compareTo(WsmgCommonConstants.XPATH_DIALECT) == 0) {
+
+                // use topicFromUrl if
+                // was provided
+                topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);
+
+                xpathString = filterEl.getText();
+
+                log.debug("got topicLocalString=" + topicLocalString + " xpathString=" + xpathString);
+
+                // TODO: Add XPath canonicalization here in the parsing. To
+                // generate a
+                // canonicalized XPath string
+                // Possibly use Query query =
+                // XPQuery.parseQuery(xpathExpression, index);
+                if (xpathString == null) {
+                    throw new AxisFault("xpath expression is not given");
+                }
+            } else if (filterDialectAttrib.compareTo(WsmgCommonConstants.TOPIC_AND_XPATH_DIALECT) == 0) {
+                OMElement topicEl = filterEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+                        .getNamespaceURI(), "TopicExpression"));
+                if (topicEl != null) {
+                    topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText());
+                }
+                OMElement xpathEl = filterEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+                        .getNamespaceURI(), "MessageContent"));
+                if (xpathEl != null) {
+                    xpathString = xpathEl.getText();
+                    if (xpathString == null && topicLocalString == null) {
+                        throw new AxisFault("Both topic string and " + "XPath String are null!");
+                    }
+                }
+            } else {
+                throw new AxisFault("Unkown dialect: ");
+                // topicLocalString = "wseTopic"; //a special topic, used in
+                // WSNT and JMS
+            }
+
+        }
+
+        if (topicLocalString == null || topicLocalString.length() == 0) {
+            topicLocalString = WsmgCommonConstants.WILDCARD_TOPIC;
+        }
+
+        // Create SubscriptionState Object
+        SubscriptionState state = new SubscriptionState(consumerReference, true, false, topicLocalString, xpathString,
+                "wse", outgoingQueue);
+
+        state.setNeverExpire(neverExpire); // default false
+
+        return state;
+    }
+
+    public void createSubscribeResponse(ProcessingContext ctx, String subId) throws AxisFault {
+
+        OMFactory factory = OMAbstractFactory.getOMFactory();
+
+        ctx.addResponseMsgNameSpaces(NameSpaceConstants.WSE_NS);
+
+        OMElement responseMessage = factory.createOMElement("SubscribeResponse", NameSpaceConstants.WSE_NS);
+
+        OMElement identifier = factory.createOMElement(WsmgCommonConstants.SUBSCRIPTION_ID,
+                responseMessage.getNamespace());
+        identifier.setText(subId);
+        EndpointReference serviceLocationEndpointReference = new EndpointReference(ctx.getMessageContext()
+                .getAxisService().getEndpointURL());
+
+        serviceLocationEndpointReference.addReferenceParameter(identifier);
+
+        OMElement expiresEl = factory.createOMElement("Expires", responseMessage.getNamespace(), responseMessage);
+
+        Date expiration = getFutureExpirationDate();
+        String dateString = CommonRoutines.getXsdDateTime(expiration);
+        expiresEl.setText(dateString);
+
+        OMElement subscriptionManagerEpr = null;
+        try {
+
+            subscriptionManagerEpr = EndpointReferenceHelper.toOM(factory, serviceLocationEndpointReference, new QName(
+                    NameSpaceConstants.WSE_NS.getNamespaceURI(), "SubscriptionManager"), NameSpaceConstants.WSA_NS
+                    .getNamespaceURI());
+
+            responseMessage.addChild(subscriptionManagerEpr);
+            subscriptionManagerEpr.setNamespace(responseMessage.getNamespace());
+        } catch (AxisFault e) {
+            log.error("unable to resolve EPR from OM", e);
+            throw e;
+        }
+
+        ctx.setRespMessage(responseMessage);
+    }
+
+    private Date getFutureExpirationDate() {
+        // Get a Calendar for current locale and time zone
+        Calendar cal = Calendar.getInstance();
+        // currentDate.setDate(currentDate.getDate()+1);
+        // Get a Date object that represents 30 days from now
+        Date currentDate = new Date(); // Current date
+        cal.setTime(currentDate); // Set it in the Calendar object
+        cal.add(Calendar.DATE, 30); // Add 30 days
+        Date expiration = cal.getTime(); // Retrieve the resulting date
+        return expiration;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
new file mode 100644
index 0000000..6af3bc5
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wseventing;
+
+import org.apache.airavata.wsmg.broker.AbstractBrokerMsgReceiver;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
+import org.apache.airavata.wsmg.util.WsEventingOperations;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BrokerServiceMessageReceiverInOut message receiver
+ */
+
+public class WSEventingMsgReceiver extends AbstractBrokerMsgReceiver {
+
+    private static final Logger log = LoggerFactory.getLogger(WSEventingMsgReceiver.class);
+    WSEProcessingContextBuilder builder = new WSEProcessingContextBuilder();
+
+    public MessageContext process(MessageContext inMsg, String operationName) throws AxisFault {
+
+        WsEventingOperations msgType = WsEventingOperations.valueFrom(operationName);
+        ProcessingContext processingContext = builder.build(inMsg, msgType);
+        MessageContext outputMsg = null;
+
+        log.debug("WS-Eventing: " + msgType);
+
+        switch (msgType) {
+        case SUBSCRIBE: {
+            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+            brokerConfigContext.getSubscriptionManager().subscribe(processingContext);
+            outputMsg = createOutputMessageContext(inMsg, processingContext);
+            break;
+        }
+        case UNSUBSCRIBE: {
+            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+            brokerConfigContext.getSubscriptionManager().unsubscribe(processingContext);
+            outputMsg = createOutputMessageContext(inMsg, processingContext);
+            break;
+        }
+        case RENEW:
+        case GET_STATUS:
+        case SUBSCRIPTION_END:
+        default:
+            throw new AxisFault("unsupported operation" + msgType.toString());
+
+        }
+        return outputMsg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java
new file mode 100644
index 0000000..63c6bbc
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wseventing;
+
+import org.apache.airavata.wsmg.broker.AbstractBrokerMsgReceiver;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
+import org.apache.airavata.wsmg.util.WsEventingOperations;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+
+public class WSEventingPublishMsgReceiver extends AbstractBrokerMsgReceiver {
+
+    WSEProcessingContextBuilder builder = new WSEProcessingContextBuilder();
+
+    @Override
+    protected MessageContext process(MessageContext inMsgContext, String operationName) throws AxisFault {
+
+        ProcessingContext processingContext = builder.build(inMsgContext, WsEventingOperations.PUBLISH);
+
+        try {
+
+            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsgContext
+                    .getConfigurationContext().getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+            brokerConfigContext.getNotificationProcessor().processMsg(processingContext, NameSpaceConstants.WSE_NS);
+        } catch (Exception e) {
+            throw new AxisFault("unable to process message", e);
+        }
+        return createOutputMessageContext(inMsgContext, processingContext);
+    }
+
+    @Override
+    protected MessageContext createOutputMessageContext(MessageContext inMsg, ProcessingContext processingContext)
+            throws AxisFault {
+
+        MessageContext outputContext = null;
+
+        OMElement responseMessage = processingContext.getRespMessage();
+        if (responseMessage != null) {
+
+            outputContext = super.createOutputMessageContext(inMsg, processingContext);
+
+            String responseAction = String.format("%s/%s", NameSpaceConstants.WSE_NS.getNamespaceURI(),
+                    responseMessage.getLocalName());
+
+            outputContext.setSoapAction(responseAction);
+        }
+
+        return outputContext;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java
new file mode 100644
index 0000000..c8d3e37
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java
@@ -0,0 +1,237 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wsnotification;
+
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.context.ContextParameters;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WSNTProtocolSupport {
+
+    private static final Logger log = LoggerFactory.getLogger(WSNTProtocolSupport.class);
+
+    public SubscriptionState createSubscriptionState(ProcessingContext ctx, OutGoingQueue outgoingQueue)
+            throws AxisFault {
+
+        EndpointReference consumerReference = ctx.getContextParameter(ContextParameters.NOTIFY_TO_EPR);
+
+        if (consumerReference == null) {
+            throw new AxisFault("Only Push delivery Mode (NotifyTo) is supported");
+        }
+
+        boolean neverExpire = false; // is true if expiration time is less than
+        boolean useNotify = true; // notify by event notifications
+        boolean wsrmEnabled = false;
+        String topicLocalString = "";
+        String xpathString = "";
+
+        String expireTimeString = ctx.getContextParameter(ContextParameters.SUBSCRIBER_EXPIRES);
+
+        if (expireTimeString == null) {
+            neverExpire = true;
+        } else {
+            long expireTime = Long.valueOf(expireTimeString);
+            if (expireTime < 0) {
+                neverExpire = true;
+            }
+        }
+
+        OMElement useNotifyEl = ctx.getContextParameter(ContextParameters.USE_NOTIFY_ELEMENT);
+        if (useNotifyEl != null) {
+            String s = useNotifyEl.getText();
+            useNotify = Boolean.valueOf(s);
+        }
+
+        // get policy if exist
+        OMElement element = ctx.getContextParameter(ContextParameters.SUB_POLICY);
+        if (element != null) {
+            wsrmEnabled = true;
+        }
+
+        OMElement topicExpressionEl = ctx.getContextParameter(ContextParameters.TOPIC_EXPRESSION_ELEMENT);
+
+        if (topicExpressionEl != null) {
+            topicLocalString = BrokerUtil.getTopicLocalString(topicExpressionEl.getText());
+        }
+
+        OMElement xpathEl = ctx.getContextParameter(ContextParameters.XPATH_ELEMENT);
+
+        if (xpathEl != null) {
+            xpathString = BrokerUtil.getXPathString(xpathEl);
+        }
+        if (xpathString == null && topicLocalString == null) {
+            throw new AxisFault("Both topic string and XPath String are null!");
+
+        }
+
+        if (topicLocalString == null || topicLocalString.length() == 0) {
+            topicLocalString = WsmgCommonConstants.WILDCARD_TOPIC;
+        }
+
+        // Create SubscriptionState Object
+        SubscriptionState state = new SubscriptionState(consumerReference, useNotify, wsrmEnabled, topicLocalString,
+                xpathString, "wsnt", outgoingQueue);
+
+        state.setNeverExpire(neverExpire); // default false
+
+        return state;
+    }
+
+    public void createSubscribeResponse(ProcessingContext ctx, String subId) throws AxisFault {
+
+        OMFactory factory = OMAbstractFactory.getOMFactory();
+
+        ctx.addResponseMsgNameSpaces(NameSpaceConstants.WSNT_NS);
+        OMElement responseMessage = factory.createOMElement("SubscribeResponse", NameSpaceConstants.WSNT_NS);
+
+        OMElement identifier = factory.createOMElement(WsmgCommonConstants.SUBSCRIPTION_ID,
+                responseMessage.getNamespace());
+        identifier.setText(subId);
+        EndpointReference serviceLocationEndpointReference = new EndpointReference(ctx.getMessageContext()
+                .getAxisService().getEndpointURL());
+        serviceLocationEndpointReference.addReferenceParameter(identifier);
+
+        OMElement subscriptionReference = null;
+        try {
+            subscriptionReference = EndpointReferenceHelper.toOM(factory, serviceLocationEndpointReference, new QName(
+                    "SubscriptionReference"), NameSpaceConstants.WSA_NS.getNamespaceURI());
+
+            responseMessage.addChild(subscriptionReference);
+            subscriptionReference.setNamespace(responseMessage.getNamespace());
+
+        } catch (AxisFault e) {
+            log.error("unable to resolve EPR from OM", e);
+            throw e;
+        }
+
+        ctx.setRespMessage(responseMessage);
+    }
+
+    public static class Client {
+
+        public static OMElement createSubscriptionMsg(EndpointReference eventSinkLocation, String topicExpression,
+                String xpathExpression) throws AxisFault {
+            OMFactory factory = OMAbstractFactory.getOMFactory();
+
+            OMElement message = factory.createOMElement("SubscribeRequest", NameSpaceConstants.WSNT_NS);
+
+            if (topicExpression != null) {
+                OMElement topicExpEl = factory.createOMElement("TopicExpression", NameSpaceConstants.WSNT_NS, message);
+
+                topicExpEl.addAttribute("Dialect", WsmgCommonConstants.TOPIC_EXPRESSION_SIMPLE_DIALECT,
+                        NameSpaceConstants.WSNT_NS);
+                topicExpEl.declareNamespace(NameSpaceConstants.WIDGET_NS);
+                topicExpEl.setText(NameSpaceConstants.WIDGET_NS.getPrefix() + ":" + topicExpression);
+            }
+
+            if (xpathExpression != null) {
+                OMElement xpathExpEl = factory.createOMElement("Selector", NameSpaceConstants.WSNT_NS, message);
+                xpathExpEl.addAttribute("Dialect", WsmgCommonConstants.XPATH_DIALECT, null);
+                xpathExpEl.setText(xpathExpression);
+            }
+
+            OMElement useNotifyEl = factory.createOMElement("UseNotify", message.getNamespace(), message);
+            useNotifyEl.setText("true");// check wether we still need this
+
+            OMElement eprCrEl = EndpointReferenceHelper.toOM(factory, eventSinkLocation,
+                    new QName("ConsumerReference"), NameSpaceConstants.WSA_NS.getNamespaceURI());
+
+            message.addChild(eprCrEl);
+            eprCrEl.setNamespace(message.getNamespace());
+
+            return message;
+        }
+
+        public static String decodeSubscriptionResponse(OMElement subscriptionReference) throws AxisFault {
+
+            String subscriptionId = null;
+
+            EndpointReference subscriptionReferenceEPR = EndpointReferenceHelper.fromOM(subscriptionReference);
+
+            Map<QName, OMElement> referenceParams = subscriptionReferenceEPR.getAllReferenceParameters();
+
+            if (referenceParams != null) {
+                QName identifierQName = new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(),
+                        WsmgCommonConstants.SUBSCRIPTION_ID);
+
+                OMElement identifierEl = referenceParams.get(identifierQName);
+                subscriptionId = (identifierEl != null) ? identifierEl.getText() : null;
+
+            }
+
+            return subscriptionId;
+        }
+
+        public static OMElement createUnsubscribeMsg() {
+            OMFactory factory = OMAbstractFactory.getOMFactory();
+            OMElement message = factory.createOMElement("UnsubsribeRequest", NameSpaceConstants.WSNT_NS);
+
+            return message;
+        }
+
+        public static OMElement encodeNotification(String topic, OMElement message, EndpointReference producerReference)
+                throws AxisFault {
+            OMFactory factory = OMAbstractFactory.getOMFactory();
+
+            OMElement topicExpEl = factory.createOMElement("Topic", NameSpaceConstants.WSNT_NS);
+            topicExpEl.addAttribute("Dialect", WsmgCommonConstants.TOPIC_EXPRESSION_SIMPLE_DIALECT, null);
+            topicExpEl.declareNamespace(NameSpaceConstants.WIDGET_NS);
+            topicExpEl.setText(NameSpaceConstants.WIDGET_NS.getPrefix() + ":" + topic);
+
+            OMElement messageToNotify = factory.createOMElement("Notify", NameSpaceConstants.WSNT_NS);
+            messageToNotify.declareNamespace(NameSpaceConstants.WSNT_NS);
+            messageToNotify.declareNamespace(NameSpaceConstants.WSA_NS);
+            OMElement notificationMesssageEl = factory.createOMElement("NotificationMessage",
+                    messageToNotify.getNamespace(), messageToNotify);
+
+            notificationMesssageEl.addChild(topicExpEl);
+
+            notificationMesssageEl.addChild(EndpointReferenceHelper.toOM(factory, producerReference, new QName(
+                    notificationMesssageEl.getNamespace().getNamespaceURI(), "ProducerReference",
+                    notificationMesssageEl.getNamespace().getPrefix()), NameSpaceConstants.WSA_NS.getNamespaceURI()));
+
+            OMElement messageEl = factory.createOMElement("Message", notificationMesssageEl.getNamespace(),
+                    notificationMesssageEl);
+
+            messageEl.addChild(message);
+            return messageToNotify;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
new file mode 100644
index 0000000..9d32ee9
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wsnotification;
+
+import org.apache.airavata.wsmg.broker.AbstractBrokerMsgReceiver;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
+import org.apache.airavata.wsmg.util.WsNotificationOperations;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BrokerServiceMessageReceiverInOut message receiver
+ */
+
+public class WSNotificationMsgReceiver extends AbstractBrokerMsgReceiver {
+
+    private static final Logger log = LoggerFactory.getLogger(WSNotificationMsgReceiver.class);
+
+    WSNotificationProcessingContextBuilder builder = new WSNotificationProcessingContextBuilder();
+
+    public MessageContext process(MessageContext inMsg, String operationName) throws AxisFault {
+
+        WsNotificationOperations msgType = WsNotificationOperations.valueFrom(operationName);
+
+        ProcessingContext processingContext = builder.build(inMsg, msgType);
+
+        MessageContext outputMsg = null;
+
+        switch (msgType) {
+        case NOTIFY: {
+            try {
+
+                WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg
+                        .getConfigurationContext().getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+                brokerConfigContext.getNotificationProcessor()
+                        .processMsg(processingContext, NameSpaceConstants.WSNT_NS);
+            } catch (Exception e) {
+                throw new AxisFault("unable to process message", e);
+            }
+            outputMsg = createOutputMessageContext(inMsg, processingContext);
+            break;
+        }
+        case SUBSCRIBE: {
+
+            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+            brokerConfigContext.getSubscriptionManager().subscribe(processingContext);
+            outputMsg = createOutputMessageContext(inMsg, processingContext);
+            break;
+        }
+        case UNSUBSCRIBE: {
+
+            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+            brokerConfigContext.getSubscriptionManager().unsubscribe(processingContext);
+            outputMsg = createOutputMessageContext(inMsg, processingContext);
+            break;
+        }
+        case RESUME_SUBSCRIPTION: {
+
+            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+            brokerConfigContext.getSubscriptionManager().resumeSubscription(processingContext);
+            outputMsg = createOutputMessageContext(inMsg, processingContext);
+            break;
+        }
+        case PAUSE_SUBSCRIPTION: {
+            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
+                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
+
+            brokerConfigContext.getSubscriptionManager().pauseSubscription(processingContext);
+            outputMsg = createOutputMessageContext(inMsg, processingContext);
+            break;
+        }
+        case GET_CURRENT_MSG:
+        default:
+            throw new AxisFault("not implemented yet");
+        }
+
+        return outputMsg;
+    }
+
+}// end of class

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java
new file mode 100644
index 0000000..b517234
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java
@@ -0,0 +1,179 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.wsnotification;
+
+import java.util.Iterator;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.broker.context.ContextParameters;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.broker.context.ProcessingContextBuilder;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.airavata.wsmg.util.WsNotificationOperations;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.soap.SOAPBody;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPHeader;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.addressing.EndpointReferenceHelper;
+import org.apache.axis2.context.MessageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WSNotificationProcessingContextBuilder extends ProcessingContextBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(WSNotificationProcessingContextBuilder.class);
+
+    public ProcessingContext build(OMElement elem) {
+
+        ProcessingContext processingContext = new ProcessingContext();
+
+        if (elem == null
+                || (!elem.getNamespace().getNamespaceURI().equals(NameSpaceConstants.WSNT_NS.getNamespaceURI()))) {
+
+            logger.warn("invalid message payload recieved: " + elem);
+
+            return processingContext;
+        }
+
+        String localName = elem.getLocalName();
+
+        if (localName.equals("SubscribeRequest")) {
+            onSubscription(processingContext, elem);
+        }
+        return processingContext;
+    }
+
+    public void onSubscription(ProcessingContext context, OMElement subscribeElement) {
+        context.setContextParameter(ContextParameters.SUBSCRIBE_ELEMENT, subscribeElement);
+
+        OMElement consumerReference = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+                .getNamespaceURI(), "ConsumerReference"));
+        if (consumerReference == null) {
+            logger.warn("unable to find consumer reference" + " in subscribe message: " + subscribeElement);
+            return;
+        }
+        context.setContextParameter(ContextParameters.NOTIFY_TO_ELEMENT, consumerReference);
+
+        try {
+            EndpointReference consumerEpr = EndpointReferenceHelper.fromOM(consumerReference);
+            context.setContextParameter(ContextParameters.NOTIFY_TO_EPR, consumerEpr);
+
+        } catch (AxisFault e) {
+            logger.warn("invalid epr", e);
+            return;
+        }
+
+        OMElement topicExpression = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+                .getNamespaceURI(), "TopicExpression"));
+
+        if (topicExpression != null) { // topic can be null
+
+            context.setContextParameter(ContextParameters.TOPIC_EXPRESSION_ELEMENT, topicExpression);
+
+        }
+
+        OMElement useNotify = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+                .getNamespaceURI(), "UseNotify"));
+
+        if (useNotify != null) {
+            context.setContextParameter(ContextParameters.USE_NOTIFY_ELEMENT, useNotify);
+        }
+
+        OMElement selector = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+                .getNamespaceURI(), "Selector"));
+
+        if (selector != null) {
+            context.setContextParameter(ContextParameters.XPATH_ELEMENT, selector);
+        }
+
+        OMElement subscriptionPolicy = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
+                .getNamespaceURI(), WsmgCommonConstants.SUBSCRIPTION_POLICY));
+
+        if (subscriptionPolicy != null) {
+            context.setContextParameter(ContextParameters.SUB_POLICY, subscriptionPolicy);
+        }
+
+    }
+
+    public ProcessingContext build(MessageContext msgContext, WsNotificationOperations operation) {
+        ProcessingContext context = new ProcessingContext();
+
+        SOAPEnvelope soapEnvelope = msgContext.getEnvelope();
+
+        if (soapEnvelope == null) {
+            throw new RuntimeException("invalid message context - envelope is not found");
+        }
+
+        SOAPBody soapBody = soapEnvelope.getBody();
+
+        if (soapBody == null) {
+            throw new RuntimeException("invalid message context - soap envelope is not found");
+        }
+
+        SOAPHeader soapHeader = soapEnvelope.getHeader();
+
+        if (soapHeader == null) {
+            throw new RuntimeException("invalid message context - soap header is not found");
+        }
+
+        switch (operation) {
+        case SUBSCRIBE: {
+
+            Iterator<OMElement> iterator = soapBody.getChildrenWithName(new QName(NameSpaceConstants.WSNT_NS
+                    .getNamespaceURI(), "SubscribeRequest"));
+            if (!iterator.hasNext()) {
+                throw new RuntimeException("invalid message context - unable to find Subscribe information");
+            }
+
+            onSubscription(context, iterator.next());
+        }
+            break;
+        }
+
+        context.setEnvelope(soapEnvelope);
+        extractInfoFromHeader(context, soapHeader);
+        context.setMessageConext(msgContext);
+        String topicFromUrl = BrokerUtil.getTopicFromRequestPath(msgContext.getTo().getAddress());
+        context.setContextParameter(ContextParameters.TOPIC_FROM_URL, topicFromUrl);
+
+        return context;
+    }
+
+    private void extractInfoFromHeader(ProcessingContext context, SOAPHeader header) {
+
+        Iterator ite = header.getChildrenWithName(new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(),
+                WsmgCommonConstants.SUBSCRIPTION_ID));
+        if (ite.hasNext()) {
+            OMElement identifier = (OMElement) ite.next();
+            logger.debug("extracted identifier " + identifier.getText());
+
+            context.setContextParameter(ContextParameters.SUB_ID, identifier.getText());
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java
new file mode 100644
index 0000000..73eba17
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+
+public class OutGoingMessage implements Serializable {
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = -6999667921413261492L;
+
+    String textMessage;
+
+    AdditionalMessageContent additionalMessageContent;
+
+    List<ConsumerInfo> consumerInfoList = null;
+
+    // ConsumerInfo consumerInfo=null;
+
+    /**
+	 * 
+	 */
+    public OutGoingMessage() {
+        // super();
+        // TODO Auto-generated constructor stub
+        // consumerInfo=new ConsumerInfo();
+    }
+
+    /**
+     * @param textMessage
+     * @param additionalMessageContent
+     * @param consumerInfoList
+     */
+    public OutGoingMessage(String textMessage, AdditionalMessageContent additionalMessageContent,
+            List<ConsumerInfo> consumerInfoList) {
+        super();
+        // TODO Auto-generated constructor stub
+        this.textMessage = textMessage;
+        this.additionalMessageContent = additionalMessageContent;
+        this.consumerInfoList = consumerInfoList;
+    }
+
+    /**
+     * @param consumerInfo
+     *            The consumerInfo to set.
+     */
+    public void addConsumerInfo(ConsumerInfo consumerInfo) {
+        // this.consumerInfo = consumerInfo;
+        if (consumerInfoList == null) {
+            consumerInfoList = new ArrayList<ConsumerInfo>();
+        }
+        consumerInfoList.add(consumerInfo);
+    }
+
+    /**
+     * @return Returns the textMessage.
+     */
+    public String getTextMessage() {
+        return textMessage;
+    }
+
+    /**
+     * @param textMessage
+     *            The textMessage to set.
+     */
+    public void setTextMessage(String textMessage) {
+        this.textMessage = textMessage;
+    }
+
+    /**
+     * @return Returns the consumerInfoList.
+     */
+    public List<ConsumerInfo> getConsumerInfoList() {
+        return consumerInfoList;
+    }
+
+    /**
+     * @param consumerInfoList
+     *            The consumerInfoList to set.
+     */
+    public void setConsumerInfoList(List<ConsumerInfo> consumerInfoList) {
+        this.consumerInfoList = consumerInfoList;
+    }
+
+    /**
+     * @return Returns the soapHeader.
+     */
+    public AdditionalMessageContent getAdditionalMessageContent() {
+        return additionalMessageContent;
+    }
+
+    /**
+     * @param soapHeader
+     *            The soapHeader to set.
+     */
+    public void setAdditionalMessageContent(AdditionalMessageContent soapHeader) {
+        this.additionalMessageContent = soapHeader;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
new file mode 100644
index 0000000..6b051b1
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.storage;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+
+public class WsmgInMemoryStorage implements WsmgStorage, WsmgQueue {
+
+    private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
+
+    private Map<String, SubscriptionState> expirableSubscriptions = new ConcurrentHashMap<String, SubscriptionState>();
+
+    private Map<String, SubscriptionState> unexpirableSubscriptions = new ConcurrentHashMap<String, SubscriptionState>();
+
+    public void dispose() {
+        queue.clear();
+        expirableSubscriptions.clear();
+        unexpirableSubscriptions.clear();
+    }
+
+    public int insert(SubscriptionState subscription) {
+        if (subscription.isNeverExpire()) {
+            unexpirableSubscriptions.put(subscription.getId(), subscription);
+        } else {
+            expirableSubscriptions.put(subscription.getId(), subscription);
+        }
+        return 0;
+    }
+
+    public int delete(String subscriptionId) {
+        expirableSubscriptions.remove(subscriptionId);
+        unexpirableSubscriptions.remove(subscriptionId);
+        return 0;
+    }
+
+    public List<SubscriptionEntry> getAllSubscription() {
+
+        List<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>(expirableSubscriptions.size()
+                + unexpirableSubscriptions.size());
+
+        Collection<SubscriptionState> entries = expirableSubscriptions.values();
+
+        for (SubscriptionState s : entries) {
+            SubscriptionEntry se = new SubscriptionEntry();
+            se.setSubscribeXml(s.getSubscribeXml());
+            se.setSubscriptionId(s.getId());
+            ret.add(se);
+        }
+        entries = unexpirableSubscriptions.values();
+        for (SubscriptionState s : entries) {
+            SubscriptionEntry se = new SubscriptionEntry();
+            se.setSubscribeXml(s.getSubscribeXml());
+            se.setSubscriptionId(s.getId());
+            ret.add(se);
+        }
+
+        return ret;
+    }
+
+    public Object blockingDequeue() {
+        Object obj = null;
+
+        try {
+            obj = queue.take();
+        } catch (InterruptedException ie) {
+            throw new RuntimeException("interruped exception occured", ie);
+        }
+
+        return obj;
+    }
+
+    public void cleanup() {
+        queue.clear();
+    }
+
+    public void enqueue(Object object, String trackId) {
+        queue.offer(object);
+    }
+}