You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:30:27 UTC

[25/90] [abbrv] [partial] AIRAVATA-1124

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java
deleted file mode 100644
index 9ddadb3..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/subscription/SubscriptionState.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.subscription;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import javax.xml.namespace.QName;
-
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.commons.util.OMElementComparator;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.apache.airavata.wsmg.util.BrokerUtil;
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.addressing.EndpointReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SubscriptionState {
-
-    private final Logger logger = LoggerFactory.getLogger(SubscriptionState.class);
-
-    private long creationTime = 0;
-    private long lastAvailableTime = 0;
-
-    private int unAvailableCounter = 0;
-    private boolean isNeverExpire = false;
-    private boolean isWsrmPolicy;
-
-    public String subId;
-    public String curNotif;
-    private String localTopicString;
-    private String xpathString;
-    private String subscribeXml;
-
-    ConsumerInfo consumerInfo = null;
-
-    EndpointReference consumerReference;
-
-    URI consumerURI = null;
-
-    private OutGoingQueue outGoingQueue;
-
-    /**
-     * @return Returns the creationTime.
-     */
-    public long getCreationTime() {
-        return creationTime;
-    }
-
-    /**
-     * @param creationTime
-     *            The creationTime to set.
-     */
-    public void setCreationTime(long creationTime) {
-        this.creationTime = creationTime;
-    }
-
-    public void setId(String id) {
-        subId = id;
-    }
-
-    public String getId() {
-        return subId;
-    }
-
-    public boolean isWsrmPolicy() {
-        return isWsrmPolicy;
-    }
-
-    public void setWsrmPolicy(boolean wsrmPolicy) {
-        this.isWsrmPolicy = wsrmPolicy;
-    }
-
-    // TODO: outGoingQueue is not belong to this class. Move it to elsewhere
-    // related to notification handler in wsntAdapter.
-    public SubscriptionState(EndpointReference consumerRef, boolean useNotify, boolean wsrmEnabled, String topic,
-            String xpath, String type, OutGoingQueue outGoingQueue) {
-        this.consumerReference = consumerRef;
-        try {
-            this.consumerURI = new URI(consumerRef.getAddress());
-        } catch (URISyntaxException e) {
-            // this should not happen
-            logger.error("invalid consumer URI returned by axis om", e);
-
-        }
-        this.outGoingQueue = outGoingQueue;
-        // if (topic == null) {
-        // throw new IllegalArgumentException();
-        // }
-        this.localTopicString = topic;
-        this.xpathString = xpath;
-        this.isWsrmPolicy = wsrmEnabled;
-        consumerInfo = new ConsumerInfo(consumerRef.getAddress(), type, useNotify, false);
-
-    }
-
-    public void resume() {
-        consumerInfo.setPaused(false);
-    }
-
-    public void pause() {
-        consumerInfo.setPaused(true);
-    }
-
-    public String getConsumerIPAddressStr() {
-        return consumerURI.toString();
-    }
-
-    public URI getConsumerAddressURI() {
-        return consumerURI;
-    }
-
-    public String getLocalTopic() {
-        // QName topicExpressionQName =
-        // xsul.util.XsulUtil.toQName(localTopicString, localTopicString
-        // .requiredTextContent());
-        // String topicLocalString = topicExpressionQName.getLocalPart();
-        return localTopicString;
-    }
-
-    /**
-     * @return Returns the consumeReference.
-     */
-    public EndpointReference getConsumerReference() {
-        return consumerReference;
-    }
-
-    /**
-     * @return Returns the curNotif.
-     */
-    public String getCurNotif() {
-        return curNotif;
-    }
-
-    /**
-     * @param curNotif
-     *            The curNotif to set.
-     */
-    public void setCurNotif(String curNotif) {
-        this.curNotif = curNotif;
-    }
-
-    /**
-     * @return Returns the outGoingQueue.
-     */
-    public OutGoingQueue getOutGoingQueue() {
-        return outGoingQueue;
-    }
-
-    /**
-     * @param outGoingQueue
-     *            The outGoingQueue to set.
-     */
-    public void setOutGoingQueue(OutGoingQueue outGoingQueue) {
-        this.outGoingQueue = outGoingQueue;
-    }
-
-    /**
-     * @return Returns the consumerInfo.
-     */
-    public ConsumerInfo getConsumerInfo() {
-        return consumerInfo;
-    }
-
-    public void resetUnAvailableCounter() {
-        unAvailableCounter = 0;
-    }
-
-    public int addUnAvailableCounter() {
-        unAvailableCounter++;
-        return unAvailableCounter;
-    }
-
-    /**
-     * @return Returns the unAvailableCounter.
-     */
-    public int getUnAvailableCounter() {
-        return unAvailableCounter;
-    }
-
-    public String getXpathString() {
-        return xpathString;
-    }
-
-    public void setXpathString(String xpathString) {
-        this.xpathString = xpathString;
-    }
-
-    /**
-     * @return Returns the isNeverExpire.
-     */
-    public boolean isNeverExpire() {
-        return isNeverExpire;
-    }
-
-    /**
-     * @param isNeverExpire
-     *            The isNeverExpire to set.
-     */
-    public void setNeverExpire(boolean neverExpire) {
-        this.isNeverExpire = neverExpire;
-    }
-
-    /**
-     * @return Returns the lastAvailableTime.
-     */
-    public long getLastAvailableTime() {
-        return lastAvailableTime;
-    }
-
-    /**
-     * @param lastAvailableTime
-     *            The lastAvailableTime to set.
-     */
-    public void setLastAvailableTime(long lastAvailableTime) {
-        this.lastAvailableTime = lastAvailableTime;
-
-    }
-
-    public void setSubscribeXml(String xml) {
-        subscribeXml = xml;
-    }
-
-    public String getSubscribeXml() {
-        return subscribeXml;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o instanceof SubscriptionState) {
-            SubscriptionState subscription = (SubscriptionState) o;
-            return BrokerUtil.sameStringValue(subscription.getLocalTopic(), this.getLocalTopic())
-                    && BrokerUtil.sameStringValue(subscription.getXpathString(), this.getXpathString())
-                    && BrokerUtil.sameStringValue(subscription.getConsumerIPAddressStr(),
-                            this.getConsumerIPAddressStr()) && equalReferenceParameters(subscription);
-        }
-        return false;
-    }
-
-    private boolean equalReferenceParameters(SubscriptionState anotherSubscription) {
-
-        Map<QName, OMElement> otherRefProperties = anotherSubscription.getConsumerReference()
-                .getAllReferenceParameters();
-        Map<QName, OMElement> myRefProperties = getConsumerReference().getAllReferenceParameters();
-
-        /*
-         * Basic comparison
-         */
-        if (otherRefProperties == null && myRefProperties == null) {
-            return true;
-        } else if (otherRefProperties == null || myRefProperties == null) {
-            return false;
-        } else if (otherRefProperties.size() != myRefProperties.size()) {
-            return false;
-        }
-
-        /*
-         * This OMElementComparator supports ignore list, but we don't use it here.
-         */
-        Iterator<Entry<QName, OMElement>> iterator = otherRefProperties.entrySet().iterator();
-        while (iterator.hasNext()) {
-
-            Entry<QName, OMElement> entry = iterator.next();
-            if (!myRefProperties.containsKey(entry.getKey())) {
-                return false;
-            }
-
-            OMElement myElement = myRefProperties.get(entry.getKey());
-            OMElement otherElement = entry.getValue();
-
-            if (!OMElementComparator.compare(myElement, otherElement)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java
deleted file mode 100644
index fe276bf..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProcessingContextBuilder.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.wseventing;
-
-import java.util.Iterator;
-
-import javax.xml.namespace.QName;
-
-import org.apache.airavata.wsmg.broker.context.ContextParameters;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.broker.context.ProcessingContextBuilder;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.util.BrokerUtil;
-import org.apache.airavata.wsmg.util.WsEventingOperations;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPBody;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPHeader;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReferenceHelper;
-import org.apache.axis2.context.MessageContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WSEProcessingContextBuilder extends ProcessingContextBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(WSEProcessingContextBuilder.class);
-
-    public ProcessingContext build(OMElement elem) {
-
-        ProcessingContext processingContext = new ProcessingContext();
-
-        if (elem != null && elem.getLocalName().equals("Subscribe")) {
-            logger.debug("found subscribe element");
-            onSubscription(processingContext, elem);
-
-        }
-
-        return processingContext;
-    }
-
-    public ProcessingContext build(SOAPEnvelope elem) {
-
-        ProcessingContext context = null;
-
-        SOAPBody soapBody = elem.getBody();
-        if (soapBody != null) {
-
-            context = build(soapBody.getFirstElement());
-
-        } else {
-            context = build((OMElement) null);
-        }
-
-        context.setEnvelope(elem);
-        extractInfoFromHeader(context, elem.getHeader());
-
-        return context;
-    }
-
-    public ProcessingContext build(MessageContext msgContext, WsEventingOperations operation) {
-
-        ProcessingContext processingContext = new ProcessingContext();
-
-        switch (operation) {
-        case SUBSCRIBE: {
-
-            Iterator<OMElement> iterator = msgContext.getEnvelope().getBody()
-                    .getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(), "Subscribe"));
-
-            if (!iterator.hasNext()) {
-                throw new RuntimeException("invalid subscription message - no subscribe element");
-            }
-
-            onSubscription(processingContext, iterator.next());
-        }
-            break;
-
-        }
-
-        processingContext.setMessageConext(msgContext);
-        processingContext.setEnvelope(msgContext.getEnvelope());
-        extractInfoFromHeader(processingContext, msgContext.getEnvelope().getHeader());
-        String topicFromUrl = BrokerUtil.getTopicFromRequestPath(msgContext.getTo().getAddress());
-
-        processingContext.setContextParameter(ContextParameters.TOPIC_FROM_URL, topicFromUrl);
-
-        return processingContext;
-    }
-
-    /**
-     * @param processingContext
-     * @param subscribeElement
-     */
-    private void onSubscription(ProcessingContext processingContext, OMElement subscribeElement) {
-
-        processingContext.setContextParameter(ContextParameters.SUBSCRIBE_ELEMENT, subscribeElement);
-
-        // -- check optional element - expires
-        Iterator iterator = subscribeElement.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(),
-                "Expires"));
-
-        if (iterator.hasNext()) {
-
-            processingContext.setContextParameter(ContextParameters.SUBSCRIBER_EXPIRES,
-                    ((OMElement) iterator.next()).getText());
-
-        }
-
-        iterator = subscribeElement
-                .getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(), "Filter"));
-
-        if (!iterator.hasNext()) {
-
-            throw new RuntimeException("invalid subscription - unable to find filter dialet");
-
-        }
-
-        processingContext.setContextParameter(ContextParameters.FILTER_ELEMENT, iterator.next());
-
-        iterator = subscribeElement.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(),
-                "Delivery"));
-
-        if (!iterator.hasNext()) {
-            throw new RuntimeException("invalid subscription - unable to find delivery tag");
-        }
-
-        OMElement delivery = (OMElement) iterator.next();
-
-        iterator = delivery.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(), "NotifyTo"));
-
-        if (!iterator.hasNext()) {
-            throw new RuntimeException("invalid subscription - unable to find NotifyTo tag");
-        }
-
-        OMElement notifyToElement = (OMElement) iterator.next();
-
-        processingContext.setContextParameter(ContextParameters.NOTIFY_TO_ELEMENT, notifyToElement);
-
-        try {
-
-            processingContext.setContextParameter(ContextParameters.NOTIFY_TO_EPR,
-                    EndpointReferenceHelper.fromOM(notifyToElement));
-
-        } catch (AxisFault e) {
-            throw new RuntimeException("invalid subscription - unable to parse notify to end point reference", e);
-        }
-
-    }
-
-    private void extractInfoFromHeader(ProcessingContext context, SOAPHeader header) {
-
-        Iterator ite = header.getChildrenWithName(new QName(NameSpaceConstants.WSE_NS.getNamespaceURI(),
-                WsmgCommonConstants.SUBSCRIPTION_ID));
-
-        if (ite.hasNext()) {
-            OMElement identifier = (OMElement) ite.next();
-            logger.debug("extracted identifier " + identifier.getText());
-
-            context.setContextParameter(ContextParameters.SUB_ID, identifier.getText());
-
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java
deleted file mode 100644
index f17a980..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEProtocolSupport.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.wseventing;
-
-import java.util.Calendar;
-import java.util.Date;
-
-import javax.xml.namespace.QName;
-
-import org.apache.airavata.wsmg.broker.context.ContextParameters;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.commons.CommonRoutines;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.apache.airavata.wsmg.util.BrokerUtil;
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.addressing.EndpointReferenceHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WSEProtocolSupport {
-
-    private static final Logger log = LoggerFactory.getLogger(WSEProtocolSupport.class);
-
-    public SubscriptionState createSubscriptionState(ProcessingContext ctx, OutGoingQueue outgoingQueue)
-            throws AxisFault {
-
-        boolean neverExpire = false; // is true if expiration time is less than
-        String topicLocalString = "";
-        String xpathString = "";
-        EndpointReference consumerReference = ctx.getContextParameter(ContextParameters.NOTIFY_TO_EPR);
-
-        if (consumerReference == null) {
-            throw new AxisFault("Only Push delivery Mode (NotifyTo) is supported in WSE");
-        }
-
-        String expireTimeString = ctx.getContextParameter(ContextParameters.SUBSCRIBER_EXPIRES);
-
-        if (expireTimeString == null) {
-            neverExpire = true;
-
-        } else {
-
-            long expireTime = Long.valueOf(expireTimeString);
-            if (expireTime < 0) {
-                neverExpire = true;
-            }
-        }
-
-        OMElement filterEl = ctx.getContextParameter(ContextParameters.FILTER_ELEMENT);
-
-        if (filterEl == null) {
-
-            topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);
-
-            if (topicLocalString == null) {
-                topicLocalString = WsmgCommonConstants.WILDCARD_TOPIC;
-            }
-
-            log.debug("got topicLocalString=" + topicLocalString);
-            // topicLocalString = "wseTopic";
-            // // a special topic, used in WSNT and JMS. Do not use a
-            // wildcard topic here since wildcard string varies by system
-        } else {
-
-            String filterDialectAttrib = filterEl.getAttributeValue(new QName(null, "Dialect"));
-
-            if (filterDialectAttrib.compareTo(WsmgCommonConstants.TOPIC_EXPRESSION_SIMPLE_DIALECT) == 0) {
-                topicLocalString = BrokerUtil.getTopicLocalString(filterEl.getText()); // get what ever inside this
-                                                                                       // element
-
-                if (topicLocalString == null) {
-                    throw new AxisFault("topic is not given in the subscription");
-                }
-
-            } else if (filterDialectAttrib.compareTo(WsmgCommonConstants.XPATH_DIALECT) == 0) {
-
-                // use topicFromUrl if
-                // was provided
-                topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);
-
-                xpathString = filterEl.getText();
-
-                log.debug("got topicLocalString=" + topicLocalString + " xpathString=" + xpathString);
-
-                // TODO: Add XPath canonicalization here in the parsing. To
-                // generate a
-                // canonicalized XPath string
-                // Possibly use Query query =
-                // XPQuery.parseQuery(xpathExpression, index);
-                if (xpathString == null) {
-                    throw new AxisFault("xpath expression is not given");
-                }
-            } else if (filterDialectAttrib.compareTo(WsmgCommonConstants.TOPIC_AND_XPATH_DIALECT) == 0) {
-                OMElement topicEl = filterEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                        .getNamespaceURI(), "TopicExpression"));
-                if (topicEl != null) {
-                    topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText());
-                }
-                OMElement xpathEl = filterEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                        .getNamespaceURI(), "MessageContent"));
-                if (xpathEl != null) {
-                    xpathString = xpathEl.getText();
-                    if (xpathString == null && topicLocalString == null) {
-                        throw new AxisFault("Both topic string and " + "XPath String are null!");
-                    }
-                }
-            } else {
-                throw new AxisFault("Unkown dialect: ");
-                // topicLocalString = "wseTopic"; //a special topic, used in
-                // WSNT and JMS
-            }
-
-        }
-
-        if (topicLocalString == null || topicLocalString.length() == 0) {
-            topicLocalString = WsmgCommonConstants.WILDCARD_TOPIC;
-        }
-
-        // Create SubscriptionState Object
-        SubscriptionState state = new SubscriptionState(consumerReference, true, false, topicLocalString, xpathString,
-                "wse", outgoingQueue);
-
-        state.setNeverExpire(neverExpire); // default false
-
-        return state;
-    }
-
-    public void createSubscribeResponse(ProcessingContext ctx, String subId) throws AxisFault {
-
-        OMFactory factory = OMAbstractFactory.getOMFactory();
-
-        ctx.addResponseMsgNameSpaces(NameSpaceConstants.WSE_NS);
-
-        OMElement responseMessage = factory.createOMElement("SubscribeResponse", NameSpaceConstants.WSE_NS);
-
-        OMElement identifier = factory.createOMElement(WsmgCommonConstants.SUBSCRIPTION_ID,
-                responseMessage.getNamespace());
-        identifier.setText(subId);
-        EndpointReference serviceLocationEndpointReference = new EndpointReference(ctx.getMessageContext()
-                .getAxisService().getEndpointURL());
-
-        serviceLocationEndpointReference.addReferenceParameter(identifier);
-
-        OMElement expiresEl = factory.createOMElement("Expires", responseMessage.getNamespace(), responseMessage);
-
-        Date expiration = getFutureExpirationDate();
-        String dateString = CommonRoutines.getXsdDateTime(expiration);
-        expiresEl.setText(dateString);
-
-        OMElement subscriptionManagerEpr = null;
-        try {
-
-            subscriptionManagerEpr = EndpointReferenceHelper.toOM(factory, serviceLocationEndpointReference, new QName(
-                    NameSpaceConstants.WSE_NS.getNamespaceURI(), "SubscriptionManager"), NameSpaceConstants.WSA_NS
-                    .getNamespaceURI());
-
-            responseMessage.addChild(subscriptionManagerEpr);
-            subscriptionManagerEpr.setNamespace(responseMessage.getNamespace());
-        } catch (AxisFault e) {
-            log.error("unable to resolve EPR from OM", e);
-            throw e;
-        }
-
-        ctx.setRespMessage(responseMessage);
-    }
-
-    private Date getFutureExpirationDate() {
-        // Get a Calendar for current locale and time zone
-        Calendar cal = Calendar.getInstance();
-        // currentDate.setDate(currentDate.getDate()+1);
-        // Get a Date object that represents 30 days from now
-        Date currentDate = new Date(); // Current date
-        cal.setTime(currentDate); // Set it in the Calendar object
-        cal.add(Calendar.DATE, 30); // Add 30 days
-        Date expiration = cal.getTime(); // Retrieve the resulting date
-        return expiration;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
deleted file mode 100644
index 6af3bc5..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingMsgReceiver.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.wseventing;
-
-import org.apache.airavata.wsmg.broker.AbstractBrokerMsgReceiver;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.util.WsEventingOperations;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.MessageContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * BrokerServiceMessageReceiverInOut message receiver
- */
-
-public class WSEventingMsgReceiver extends AbstractBrokerMsgReceiver {
-
-    private static final Logger log = LoggerFactory.getLogger(WSEventingMsgReceiver.class);
-    WSEProcessingContextBuilder builder = new WSEProcessingContextBuilder();
-
-    public MessageContext process(MessageContext inMsg, String operationName) throws AxisFault {
-
-        WsEventingOperations msgType = WsEventingOperations.valueFrom(operationName);
-        ProcessingContext processingContext = builder.build(inMsg, msgType);
-        MessageContext outputMsg = null;
-
-        log.debug("WS-Eventing: " + msgType);
-
-        switch (msgType) {
-        case SUBSCRIBE: {
-            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
-                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
-
-            brokerConfigContext.getSubscriptionManager().subscribe(processingContext);
-            outputMsg = createOutputMessageContext(inMsg, processingContext);
-            break;
-        }
-        case UNSUBSCRIBE: {
-            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
-                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
-
-            brokerConfigContext.getSubscriptionManager().unsubscribe(processingContext);
-            outputMsg = createOutputMessageContext(inMsg, processingContext);
-            break;
-        }
-        case RENEW:
-        case GET_STATUS:
-        case SUBSCRIPTION_END:
-        default:
-            throw new AxisFault("unsupported operation" + msgType.toString());
-
-        }
-        return outputMsg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java
deleted file mode 100644
index 63c6bbc..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wseventing/WSEventingPublishMsgReceiver.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.wseventing;
-
-import org.apache.airavata.wsmg.broker.AbstractBrokerMsgReceiver;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.util.WsEventingOperations;
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.MessageContext;
-
-public class WSEventingPublishMsgReceiver extends AbstractBrokerMsgReceiver {
-
-    WSEProcessingContextBuilder builder = new WSEProcessingContextBuilder();
-
-    @Override
-    protected MessageContext process(MessageContext inMsgContext, String operationName) throws AxisFault {
-
-        ProcessingContext processingContext = builder.build(inMsgContext, WsEventingOperations.PUBLISH);
-
-        try {
-
-            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsgContext
-                    .getConfigurationContext().getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
-
-            brokerConfigContext.getNotificationProcessor().processMsg(processingContext, NameSpaceConstants.WSE_NS);
-        } catch (Exception e) {
-            throw new AxisFault("unable to process message", e);
-        }
-        return createOutputMessageContext(inMsgContext, processingContext);
-    }
-
-    @Override
-    protected MessageContext createOutputMessageContext(MessageContext inMsg, ProcessingContext processingContext)
-            throws AxisFault {
-
-        MessageContext outputContext = null;
-
-        OMElement responseMessage = processingContext.getRespMessage();
-        if (responseMessage != null) {
-
-            outputContext = super.createOutputMessageContext(inMsg, processingContext);
-
-            String responseAction = String.format("%s/%s", NameSpaceConstants.WSE_NS.getNamespaceURI(),
-                    responseMessage.getLocalName());
-
-            outputContext.setSoapAction(responseAction);
-        }
-
-        return outputContext;
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java
deleted file mode 100644
index c8d3e37..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNTProtocolSupport.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.wsnotification;
-
-import java.util.Map;
-
-import javax.xml.namespace.QName;
-
-import org.apache.airavata.wsmg.broker.context.ContextParameters;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.apache.airavata.wsmg.util.BrokerUtil;
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.addressing.EndpointReferenceHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WSNTProtocolSupport {
-
-    private static final Logger log = LoggerFactory.getLogger(WSNTProtocolSupport.class);
-
-    public SubscriptionState createSubscriptionState(ProcessingContext ctx, OutGoingQueue outgoingQueue)
-            throws AxisFault {
-
-        EndpointReference consumerReference = ctx.getContextParameter(ContextParameters.NOTIFY_TO_EPR);
-
-        if (consumerReference == null) {
-            throw new AxisFault("Only Push delivery Mode (NotifyTo) is supported");
-        }
-
-        boolean neverExpire = false; // is true if expiration time is less than
-        boolean useNotify = true; // notify by event notifications
-        boolean wsrmEnabled = false;
-        String topicLocalString = "";
-        String xpathString = "";
-
-        String expireTimeString = ctx.getContextParameter(ContextParameters.SUBSCRIBER_EXPIRES);
-
-        if (expireTimeString == null) {
-            neverExpire = true;
-        } else {
-            long expireTime = Long.valueOf(expireTimeString);
-            if (expireTime < 0) {
-                neverExpire = true;
-            }
-        }
-
-        OMElement useNotifyEl = ctx.getContextParameter(ContextParameters.USE_NOTIFY_ELEMENT);
-        if (useNotifyEl != null) {
-            String s = useNotifyEl.getText();
-            useNotify = Boolean.valueOf(s);
-        }
-
-        // get policy if exist
-        OMElement element = ctx.getContextParameter(ContextParameters.SUB_POLICY);
-        if (element != null) {
-            wsrmEnabled = true;
-        }
-
-        OMElement topicExpressionEl = ctx.getContextParameter(ContextParameters.TOPIC_EXPRESSION_ELEMENT);
-
-        if (topicExpressionEl != null) {
-            topicLocalString = BrokerUtil.getTopicLocalString(topicExpressionEl.getText());
-        }
-
-        OMElement xpathEl = ctx.getContextParameter(ContextParameters.XPATH_ELEMENT);
-
-        if (xpathEl != null) {
-            xpathString = BrokerUtil.getXPathString(xpathEl);
-        }
-        if (xpathString == null && topicLocalString == null) {
-            throw new AxisFault("Both topic string and XPath String are null!");
-
-        }
-
-        if (topicLocalString == null || topicLocalString.length() == 0) {
-            topicLocalString = WsmgCommonConstants.WILDCARD_TOPIC;
-        }
-
-        // Create SubscriptionState Object
-        SubscriptionState state = new SubscriptionState(consumerReference, useNotify, wsrmEnabled, topicLocalString,
-                xpathString, "wsnt", outgoingQueue);
-
-        state.setNeverExpire(neverExpire); // default false
-
-        return state;
-    }
-
-    public void createSubscribeResponse(ProcessingContext ctx, String subId) throws AxisFault {
-
-        OMFactory factory = OMAbstractFactory.getOMFactory();
-
-        ctx.addResponseMsgNameSpaces(NameSpaceConstants.WSNT_NS);
-        OMElement responseMessage = factory.createOMElement("SubscribeResponse", NameSpaceConstants.WSNT_NS);
-
-        OMElement identifier = factory.createOMElement(WsmgCommonConstants.SUBSCRIPTION_ID,
-                responseMessage.getNamespace());
-        identifier.setText(subId);
-        EndpointReference serviceLocationEndpointReference = new EndpointReference(ctx.getMessageContext()
-                .getAxisService().getEndpointURL());
-        serviceLocationEndpointReference.addReferenceParameter(identifier);
-
-        OMElement subscriptionReference = null;
-        try {
-            subscriptionReference = EndpointReferenceHelper.toOM(factory, serviceLocationEndpointReference, new QName(
-                    "SubscriptionReference"), NameSpaceConstants.WSA_NS.getNamespaceURI());
-
-            responseMessage.addChild(subscriptionReference);
-            subscriptionReference.setNamespace(responseMessage.getNamespace());
-
-        } catch (AxisFault e) {
-            log.error("unable to resolve EPR from OM", e);
-            throw e;
-        }
-
-        ctx.setRespMessage(responseMessage);
-    }
-
-    public static class Client {
-
-        public static OMElement createSubscriptionMsg(EndpointReference eventSinkLocation, String topicExpression,
-                String xpathExpression) throws AxisFault {
-            OMFactory factory = OMAbstractFactory.getOMFactory();
-
-            OMElement message = factory.createOMElement("SubscribeRequest", NameSpaceConstants.WSNT_NS);
-
-            if (topicExpression != null) {
-                OMElement topicExpEl = factory.createOMElement("TopicExpression", NameSpaceConstants.WSNT_NS, message);
-
-                topicExpEl.addAttribute("Dialect", WsmgCommonConstants.TOPIC_EXPRESSION_SIMPLE_DIALECT,
-                        NameSpaceConstants.WSNT_NS);
-                topicExpEl.declareNamespace(NameSpaceConstants.WIDGET_NS);
-                topicExpEl.setText(NameSpaceConstants.WIDGET_NS.getPrefix() + ":" + topicExpression);
-            }
-
-            if (xpathExpression != null) {
-                OMElement xpathExpEl = factory.createOMElement("Selector", NameSpaceConstants.WSNT_NS, message);
-                xpathExpEl.addAttribute("Dialect", WsmgCommonConstants.XPATH_DIALECT, null);
-                xpathExpEl.setText(xpathExpression);
-            }
-
-            OMElement useNotifyEl = factory.createOMElement("UseNotify", message.getNamespace(), message);
-            useNotifyEl.setText("true");// check wether we still need this
-
-            OMElement eprCrEl = EndpointReferenceHelper.toOM(factory, eventSinkLocation,
-                    new QName("ConsumerReference"), NameSpaceConstants.WSA_NS.getNamespaceURI());
-
-            message.addChild(eprCrEl);
-            eprCrEl.setNamespace(message.getNamespace());
-
-            return message;
-        }
-
-        public static String decodeSubscriptionResponse(OMElement subscriptionReference) throws AxisFault {
-
-            String subscriptionId = null;
-
-            EndpointReference subscriptionReferenceEPR = EndpointReferenceHelper.fromOM(subscriptionReference);
-
-            Map<QName, OMElement> referenceParams = subscriptionReferenceEPR.getAllReferenceParameters();
-
-            if (referenceParams != null) {
-                QName identifierQName = new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(),
-                        WsmgCommonConstants.SUBSCRIPTION_ID);
-
-                OMElement identifierEl = referenceParams.get(identifierQName);
-                subscriptionId = (identifierEl != null) ? identifierEl.getText() : null;
-
-            }
-
-            return subscriptionId;
-        }
-
-        public static OMElement createUnsubscribeMsg() {
-            OMFactory factory = OMAbstractFactory.getOMFactory();
-            OMElement message = factory.createOMElement("UnsubsribeRequest", NameSpaceConstants.WSNT_NS);
-
-            return message;
-        }
-
-        public static OMElement encodeNotification(String topic, OMElement message, EndpointReference producerReference)
-                throws AxisFault {
-            OMFactory factory = OMAbstractFactory.getOMFactory();
-
-            OMElement topicExpEl = factory.createOMElement("Topic", NameSpaceConstants.WSNT_NS);
-            topicExpEl.addAttribute("Dialect", WsmgCommonConstants.TOPIC_EXPRESSION_SIMPLE_DIALECT, null);
-            topicExpEl.declareNamespace(NameSpaceConstants.WIDGET_NS);
-            topicExpEl.setText(NameSpaceConstants.WIDGET_NS.getPrefix() + ":" + topic);
-
-            OMElement messageToNotify = factory.createOMElement("Notify", NameSpaceConstants.WSNT_NS);
-            messageToNotify.declareNamespace(NameSpaceConstants.WSNT_NS);
-            messageToNotify.declareNamespace(NameSpaceConstants.WSA_NS);
-            OMElement notificationMesssageEl = factory.createOMElement("NotificationMessage",
-                    messageToNotify.getNamespace(), messageToNotify);
-
-            notificationMesssageEl.addChild(topicExpEl);
-
-            notificationMesssageEl.addChild(EndpointReferenceHelper.toOM(factory, producerReference, new QName(
-                    notificationMesssageEl.getNamespace().getNamespaceURI(), "ProducerReference",
-                    notificationMesssageEl.getNamespace().getPrefix()), NameSpaceConstants.WSA_NS.getNamespaceURI()));
-
-            OMElement messageEl = factory.createOMElement("Message", notificationMesssageEl.getNamespace(),
-                    notificationMesssageEl);
-
-            messageEl.addChild(message);
-            return messageToNotify;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
deleted file mode 100644
index 9d32ee9..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationMsgReceiver.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.wsnotification;
-
-import org.apache.airavata.wsmg.broker.AbstractBrokerMsgReceiver;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
-import org.apache.airavata.wsmg.util.WsNotificationOperations;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.context.MessageContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * BrokerServiceMessageReceiverInOut message receiver
- */
-
-public class WSNotificationMsgReceiver extends AbstractBrokerMsgReceiver {
-
-    private static final Logger log = LoggerFactory.getLogger(WSNotificationMsgReceiver.class);
-
-    WSNotificationProcessingContextBuilder builder = new WSNotificationProcessingContextBuilder();
-
-    public MessageContext process(MessageContext inMsg, String operationName) throws AxisFault {
-
-        WsNotificationOperations msgType = WsNotificationOperations.valueFrom(operationName);
-
-        ProcessingContext processingContext = builder.build(inMsg, msgType);
-
-        MessageContext outputMsg = null;
-
-        switch (msgType) {
-        case NOTIFY: {
-            try {
-
-                WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg
-                        .getConfigurationContext().getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
-
-                brokerConfigContext.getNotificationProcessor()
-                        .processMsg(processingContext, NameSpaceConstants.WSNT_NS);
-            } catch (Exception e) {
-                throw new AxisFault("unable to process message", e);
-            }
-            outputMsg = createOutputMessageContext(inMsg, processingContext);
-            break;
-        }
-        case SUBSCRIBE: {
-
-            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
-                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
-            brokerConfigContext.getSubscriptionManager().subscribe(processingContext);
-            outputMsg = createOutputMessageContext(inMsg, processingContext);
-            break;
-        }
-        case UNSUBSCRIBE: {
-
-            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
-                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
-
-            brokerConfigContext.getSubscriptionManager().unsubscribe(processingContext);
-            outputMsg = createOutputMessageContext(inMsg, processingContext);
-            break;
-        }
-        case RESUME_SUBSCRIPTION: {
-
-            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
-                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
-
-            brokerConfigContext.getSubscriptionManager().resumeSubscription(processingContext);
-            outputMsg = createOutputMessageContext(inMsg, processingContext);
-            break;
-        }
-        case PAUSE_SUBSCRIPTION: {
-            WsmgConfigurationContext brokerConfigContext = (WsmgConfigurationContext) inMsg.getConfigurationContext()
-                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
-
-            brokerConfigContext.getSubscriptionManager().pauseSubscription(processingContext);
-            outputMsg = createOutputMessageContext(inMsg, processingContext);
-            break;
-        }
-        case GET_CURRENT_MSG:
-        default:
-            throw new AxisFault("not implemented yet");
-        }
-
-        return outputMsg;
-    }
-
-}// end of class

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java
deleted file mode 100644
index b517234..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/wsnotification/WSNotificationProcessingContextBuilder.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.broker.wsnotification;
-
-import java.util.Iterator;
-
-import javax.xml.namespace.QName;
-
-import org.apache.airavata.wsmg.broker.context.ContextParameters;
-import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.broker.context.ProcessingContextBuilder;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.airavata.wsmg.util.BrokerUtil;
-import org.apache.airavata.wsmg.util.WsNotificationOperations;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.soap.SOAPBody;
-import org.apache.axiom.soap.SOAPEnvelope;
-import org.apache.axiom.soap.SOAPHeader;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.addressing.EndpointReferenceHelper;
-import org.apache.axis2.context.MessageContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WSNotificationProcessingContextBuilder extends ProcessingContextBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(WSNotificationProcessingContextBuilder.class);
-
-    public ProcessingContext build(OMElement elem) {
-
-        ProcessingContext processingContext = new ProcessingContext();
-
-        if (elem == null
-                || (!elem.getNamespace().getNamespaceURI().equals(NameSpaceConstants.WSNT_NS.getNamespaceURI()))) {
-
-            logger.warn("invalid message payload recieved: " + elem);
-
-            return processingContext;
-        }
-
-        String localName = elem.getLocalName();
-
-        if (localName.equals("SubscribeRequest")) {
-            onSubscription(processingContext, elem);
-        }
-        return processingContext;
-    }
-
-    public void onSubscription(ProcessingContext context, OMElement subscribeElement) {
-        context.setContextParameter(ContextParameters.SUBSCRIBE_ELEMENT, subscribeElement);
-
-        OMElement consumerReference = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                .getNamespaceURI(), "ConsumerReference"));
-        if (consumerReference == null) {
-            logger.warn("unable to find consumer reference" + " in subscribe message: " + subscribeElement);
-            return;
-        }
-        context.setContextParameter(ContextParameters.NOTIFY_TO_ELEMENT, consumerReference);
-
-        try {
-            EndpointReference consumerEpr = EndpointReferenceHelper.fromOM(consumerReference);
-            context.setContextParameter(ContextParameters.NOTIFY_TO_EPR, consumerEpr);
-
-        } catch (AxisFault e) {
-            logger.warn("invalid epr", e);
-            return;
-        }
-
-        OMElement topicExpression = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                .getNamespaceURI(), "TopicExpression"));
-
-        if (topicExpression != null) { // topic can be null
-
-            context.setContextParameter(ContextParameters.TOPIC_EXPRESSION_ELEMENT, topicExpression);
-
-        }
-
-        OMElement useNotify = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                .getNamespaceURI(), "UseNotify"));
-
-        if (useNotify != null) {
-            context.setContextParameter(ContextParameters.USE_NOTIFY_ELEMENT, useNotify);
-        }
-
-        OMElement selector = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                .getNamespaceURI(), "Selector"));
-
-        if (selector != null) {
-            context.setContextParameter(ContextParameters.XPATH_ELEMENT, selector);
-        }
-
-        OMElement subscriptionPolicy = subscribeElement.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
-                .getNamespaceURI(), WsmgCommonConstants.SUBSCRIPTION_POLICY));
-
-        if (subscriptionPolicy != null) {
-            context.setContextParameter(ContextParameters.SUB_POLICY, subscriptionPolicy);
-        }
-
-    }
-
-    public ProcessingContext build(MessageContext msgContext, WsNotificationOperations operation) {
-        ProcessingContext context = new ProcessingContext();
-
-        SOAPEnvelope soapEnvelope = msgContext.getEnvelope();
-
-        if (soapEnvelope == null) {
-            throw new RuntimeException("invalid message context - envelope is not found");
-        }
-
-        SOAPBody soapBody = soapEnvelope.getBody();
-
-        if (soapBody == null) {
-            throw new RuntimeException("invalid message context - soap envelope is not found");
-        }
-
-        SOAPHeader soapHeader = soapEnvelope.getHeader();
-
-        if (soapHeader == null) {
-            throw new RuntimeException("invalid message context - soap header is not found");
-        }
-
-        switch (operation) {
-        case SUBSCRIBE: {
-
-            Iterator<OMElement> iterator = soapBody.getChildrenWithName(new QName(NameSpaceConstants.WSNT_NS
-                    .getNamespaceURI(), "SubscribeRequest"));
-            if (!iterator.hasNext()) {
-                throw new RuntimeException("invalid message context - unable to find Subscribe information");
-            }
-
-            onSubscription(context, iterator.next());
-        }
-            break;
-        }
-
-        context.setEnvelope(soapEnvelope);
-        extractInfoFromHeader(context, soapHeader);
-        context.setMessageConext(msgContext);
-        String topicFromUrl = BrokerUtil.getTopicFromRequestPath(msgContext.getTo().getAddress());
-        context.setContextParameter(ContextParameters.TOPIC_FROM_URL, topicFromUrl);
-
-        return context;
-    }
-
-    private void extractInfoFromHeader(ProcessingContext context, SOAPHeader header) {
-
-        Iterator ite = header.getChildrenWithName(new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(),
-                WsmgCommonConstants.SUBSCRIPTION_ID));
-        if (ite.hasNext()) {
-            OMElement identifier = (OMElement) ite.next();
-            logger.debug("extracted identifier " + identifier.getText());
-
-            context.setContextParameter(ContextParameters.SUB_ID, identifier.getText());
-
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java
deleted file mode 100644
index 73eba17..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/OutGoingMessage.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.commons;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-
-public class OutGoingMessage implements Serializable {
-    /**
-	 * 
-	 */
-    private static final long serialVersionUID = -6999667921413261492L;
-
-    String textMessage;
-
-    AdditionalMessageContent additionalMessageContent;
-
-    List<ConsumerInfo> consumerInfoList = null;
-
-    // ConsumerInfo consumerInfo=null;
-
-    /**
-	 * 
-	 */
-    public OutGoingMessage() {
-        // super();
-        // TODO Auto-generated constructor stub
-        // consumerInfo=new ConsumerInfo();
-    }
-
-    /**
-     * @param textMessage
-     * @param additionalMessageContent
-     * @param consumerInfoList
-     */
-    public OutGoingMessage(String textMessage, AdditionalMessageContent additionalMessageContent,
-            List<ConsumerInfo> consumerInfoList) {
-        super();
-        // TODO Auto-generated constructor stub
-        this.textMessage = textMessage;
-        this.additionalMessageContent = additionalMessageContent;
-        this.consumerInfoList = consumerInfoList;
-    }
-
-    /**
-     * @param consumerInfo
-     *            The consumerInfo to set.
-     */
-    public void addConsumerInfo(ConsumerInfo consumerInfo) {
-        // this.consumerInfo = consumerInfo;
-        if (consumerInfoList == null) {
-            consumerInfoList = new ArrayList<ConsumerInfo>();
-        }
-        consumerInfoList.add(consumerInfo);
-    }
-
-    /**
-     * @return Returns the textMessage.
-     */
-    public String getTextMessage() {
-        return textMessage;
-    }
-
-    /**
-     * @param textMessage
-     *            The textMessage to set.
-     */
-    public void setTextMessage(String textMessage) {
-        this.textMessage = textMessage;
-    }
-
-    /**
-     * @return Returns the consumerInfoList.
-     */
-    public List<ConsumerInfo> getConsumerInfoList() {
-        return consumerInfoList;
-    }
-
-    /**
-     * @param consumerInfoList
-     *            The consumerInfoList to set.
-     */
-    public void setConsumerInfoList(List<ConsumerInfo> consumerInfoList) {
-        this.consumerInfoList = consumerInfoList;
-    }
-
-    /**
-     * @return Returns the soapHeader.
-     */
-    public AdditionalMessageContent getAdditionalMessageContent() {
-        return additionalMessageContent;
-    }
-
-    /**
-     * @param soapHeader
-     *            The soapHeader to set.
-     */
-    public void setAdditionalMessageContent(AdditionalMessageContent soapHeader) {
-        this.additionalMessageContent = soapHeader;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/0e2c10f5/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
deleted file mode 100644
index 6b051b1..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.commons.storage;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-
-public class WsmgInMemoryStorage implements WsmgStorage, WsmgQueue {
-
-    private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
-
-    private Map<String, SubscriptionState> expirableSubscriptions = new ConcurrentHashMap<String, SubscriptionState>();
-
-    private Map<String, SubscriptionState> unexpirableSubscriptions = new ConcurrentHashMap<String, SubscriptionState>();
-
-    public void dispose() {
-        queue.clear();
-        expirableSubscriptions.clear();
-        unexpirableSubscriptions.clear();
-    }
-
-    public int insert(SubscriptionState subscription) {
-        if (subscription.isNeverExpire()) {
-            unexpirableSubscriptions.put(subscription.getId(), subscription);
-        } else {
-            expirableSubscriptions.put(subscription.getId(), subscription);
-        }
-        return 0;
-    }
-
-    public int delete(String subscriptionId) {
-        expirableSubscriptions.remove(subscriptionId);
-        unexpirableSubscriptions.remove(subscriptionId);
-        return 0;
-    }
-
-    public List<SubscriptionEntry> getAllSubscription() {
-
-        List<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>(expirableSubscriptions.size()
-                + unexpirableSubscriptions.size());
-
-        Collection<SubscriptionState> entries = expirableSubscriptions.values();
-
-        for (SubscriptionState s : entries) {
-            SubscriptionEntry se = new SubscriptionEntry();
-            se.setSubscribeXml(s.getSubscribeXml());
-            se.setSubscriptionId(s.getId());
-            ret.add(se);
-        }
-        entries = unexpirableSubscriptions.values();
-        for (SubscriptionState s : entries) {
-            SubscriptionEntry se = new SubscriptionEntry();
-            se.setSubscribeXml(s.getSubscribeXml());
-            se.setSubscriptionId(s.getId());
-            ret.add(se);
-        }
-
-        return ret;
-    }
-
-    public Object blockingDequeue() {
-        Object obj = null;
-
-        try {
-            obj = queue.take();
-        } catch (InterruptedException ie) {
-            throw new RuntimeException("interruped exception occured", ie);
-        }
-
-        return obj;
-    }
-
-    public void cleanup() {
-        queue.clear();
-    }
-
-    public void enqueue(Object object, String trackId) {
-        queue.offer(object);
-    }
-}